# 📊 Dataset Maker by Hollowstrawberry

This is based on the work of [Kohya-ss](https://github.com/kohya-ss/sd-scripts) and [Linaqruf](https://colab.research.google.com/github/Linaqruf/kohya-trainer/blob/main/kohya-LoRA-dreambooth.ipynb). Thank you!

### ⭕ Disclaimer
The purpose of this document is to research bleeding-edge technologies in the field of machine learning inference.  
Please read and follow the [Google Colab guidelines](https://research.google.com/colaboratory/faq.html) and its [Terms of Service](https://research.google.com/colaboratory/tos_v3.html).

In [None]:
import os
from IPython import get_ipython
from IPython.display import display, Markdown

COLAB = True

if COLAB:
  from google.colab.output import clear as clear_output
else:
  from IPython.display import clear_output

#@title ## 🚩 Empezar aquí

#@markdown ### 1️⃣  Inicio
#@markdown Esta celda cargará algunos requerimientos y creará las carpetas correspondientes en tu Google Drive. <p>
#@markdown Tu nombre de proyecto será la carpeta donde trabajaremos. No se permiten espacios.
nombre_proyecto = "" #@param {type:"string"}
project_name = nombre_proyecto.strip()
#@markdown La estructura de carpetas no importa y es por comodidad. Asegúrate de siempre elegir la misma. Me gusta organizar por proyecto.
estructura_de_carpetas = "Organizar por proyecto (MyDrive/Loras/nombre_proyecto/dataset)" #@param ["Organizar por categoría (MyDrive/lora_training/datasets/nombre_proyecto)", "Organizar por proyecto (MyDrive/Loras/nombre_proyecto/dataset)"]
folder_structure = estructura_de_carpetas

if not project_name or any(c in project_name for c in " .()\"'\\") or project_name.count("/") > 1:
  print("Por favor elige un nombre válido.")
else:
  if COLAB and not os.path.exists('/content/drive'):
    from google.colab import drive
    print("📂 Conectando a Google Drive...")
    drive.mount('/content/drive')

  project_base = project_name if "/" not in project_name else project_name[:project_name.rfind("/")]
  project_subfolder = project_name if "/" not in project_name else project_name[project_name.rfind("/")+1:]

  root_dir = "/content" if COLAB else "~/Loras"
  deps_dir = os.path.join(root_dir, "deps")

  if "/Loras" in folder_structure:
    main_dir      = os.path.join(root_dir, "drive/MyDrive/Loras") if COLAB else root_dir
    config_folder = os.path.join(main_dir, project_base)
    images_folder = os.path.join(main_dir, project_base, "dataset")
    if "/" in project_name:
      images_folder = os.path.join(images_folder, project_subfolder)
  else:
    main_dir      = os.path.join(root_dir, "drive/MyDrive/lora_training") if COLAB else root_dir
    config_folder = os.path.join(main_dir, "config", project_name)
    images_folder = os.path.join(main_dir, "datasets", project_name)

  for dir in [main_dir, deps_dir, images_folder, config_folder]:
    os.makedirs(dir, exist_ok=True)

  print(f"✅ ¡Proyecto {project_name} listo!")
  step1_installed_flag = True

In [None]:
# @title ## 1. Instalar dependencias
import os
from pathlib import Path

root_path = Path("/content")
trainer_dir = root_path.joinpath("trainer")

venv_pip = trainer_dir.joinpath("sd_scripts/venv/bin/pip")
venv_python = trainer_dir.joinpath("sd_scripts/venv/bin/python")

# @markdown Execute the cell to install the trainer

installed_dependencies = False
first_step_done = False

def install_trainer():
  global installed_dependencies, first_step_done

  print("Installing trainer...")
  !apt -y update -qq
  !apt install -y python3.10-venv aria2 -qq

  installed_dependencies = True

  !git clone https://github.com/derrian-distro/LoRA_Easy_Training_scripts_Backend {trainer_dir}

  !chmod 755 /content/trainer/colab_install.sh
  os.chdir(trainer_dir)
  !./colab_install.sh

  os.chdir(root_path)

  first_step_done = True
  print("Installation complete!")

def download_custom_wd_tagger():
  global wd_path

  wd_path = trainer_dir.joinpath("sd_scripts/finetune/tag_images_by_wd14_tagger.py")

  print("Downloading tagger script that allows v3 taggers...")
  !rm "{wd_path}"
  !aria2c "https://raw.githubusercontent.com/Jelosus2/Lora_Easy_Training_Colab/main/custom/tag_images_by_wd14_tagger.py" --console-log-level=warn -c -s 16 -x 16 -k 10M -d / -o "{wd_path}"

def fix_scripts_logging():
  print("Fixing sd_scripts logging issue on colab...")
  !yes | {venv_pip} uninstall rich

def main():
  install_trainer()
  download_custom_wd_tagger()
  fix_scripts_logging()
  print("Finished installation!")

try:
  main()
except Exception as e:
  print(f"Error intalling the trainer!\n{e}")
  first_step_done = False

In [None]:
# @title ## 2. Ruta de tu proyecto
from pathlib import Path
from google.colab import drive

if not globals().get("first_step_done"):
  root_path = Path("/content")
  trainer_dir = root_path.joinpath("trainer")

drive_dir = root_path.joinpath("drive/MyDrive")
pretrained_model_dir = root_path.joinpath("pretrained_model")
vae_dir = root_path.joinpath("vae")
tagger_models_dir = root_path.joinpath("tagger_models")

# @markdown La ruta base para su proyecto. Asegúrese de que se pueda utilizar como nombre de carpeta.
project_path = "/content/drive/MyDrive/Loras/tu_projecto" # @param {type: "string"}
# @markdown Especifique el nombre de los directorios. Si tiene varios conjuntos de datos, sepárelos con una coma `(,)` por ejemplo: **dataset1, dataset2, ...**

# @markdown El directorio donde se almacenarán los resultados del entrenamiento.
output_dir_name = "output" # @param {type: "string"}
# @markdown El directorio donde se ubicarán sus conjuntos de datos.
dataset_dir_name = "dataset" # @param {type: "string"}
# @markdown Utilice Drive para almacenar todos los archivos y directorios
use_drive = True # @param {type: "boolean"}

project_path = project_path.replace(" ", "_")
output_dir_name = output_dir_name.replace(" ", "_")

second_step_done = False

def is_valid_folder_name(folder_name: str) -> bool:
  invalid_characters = '<>:"/\|?*'

  if any(char in invalid_characters for char in folder_name):
    return False

  return True

def mount_drive_dir() -> Path:
  base_dir = root_path.joinpath(project_path)

  if use_drive:
    if not Path(drive_dir).exists():
      drive.mount(Path(drive_dir).parent.as_posix())
    base_dir = drive_dir.joinpath(project_path)

  return base_dir

def make_directories():
  mount_drive = mount_drive_dir()
  output_dir = mount_drive.joinpath(output_dir_name)

  if not Path(mount_drive).exists():
    Path(mount_drive).mkdir(exist_ok=True)

  for dir in [pretrained_model_dir, vae_dir, output_dir, tagger_models_dir]:
    Path(dir).mkdir(exist_ok=True)

  for dataset_m_dir in dataset_dir_name.replace(" ", "").split(','):
    if is_valid_folder_name(dataset_m_dir):
      Path(mount_drive.joinpath(dataset_m_dir)).mkdir(exist_ok=True)
    else:
      print(f"{dataset_m_dir} is not a valid name for a folder")
      return

def main():
  for name in [project_path, output_dir_name]:
      if not is_valid_folder_name(name.replace("/", "") if project_path == name else name):
        print(f"{name} is not a valid name for a folder")
        return

  print("Setting up directories...")
  make_directories()
  print("Done!")

try:
  main()
  second_step_done = True
except Exception as e:
  print(f"Error setting up the directories!\n{e}")
  second_step_done = False

In [None]:
# @markdown ### Taggea tus imagenes
import os
from pathlib import Path

# @markdown Como sugiere el nombre, este es el tipo de etiquetado que desea para su conjunto de datos.
method = "Anime" # @param ["Anime", "Photorealistic"]
# @markdown `(Solo se aplica al método Anime)` El modelo predeterminado utilizado para etiquetar es `SmilingWolf/wd-eva02-large-tagger-v3`. Lo encuentro más preciso que otros etiquetadores, pero si tienes experiencia, puedes usar otro y modificar los parámetros. Si no lo hace, la configuración predeterminada debería estar bien.)
model = "SmilingWolf/wd-v1-4-swinv2-tagger-v2" # @param ["SmilingWolf/wd-eva02-large-tagger-v3", "SmilingWolf/wd-vit-large-tagger-v3", "SmilingWolf/wd-swinv2-tagger-v3", "SmilingWolf/wd-vit-tagger-v3", "SmilingWolf/wd-convnext-tagger-v3", "SmilingWolf/wd-v1-4-swinv2-tagger-v2", "SmilingWolf/wd-v1-4-moat-tagger-v2", "SmilingWolf/wd-v1-4-convnextv2-tagger-v2", "SmilingWolf/wd-v1-4-convnext-tagger-v2", "SmilingWolf/wd-v1-4-vit-tagger-v2"]
# @markdown El nombre del directorio del conjunto de datos que desea etiquetar. Puede especificar otro directorio cuando el anterior esté completamente etiquetado, en caso de que tenga más de un conjunto de datos.
dataset_dir_name = "dataset" # @param {type: "string"}
# @markdown El tipo de archivo para guardar las tags o subtítulos.
file_extension = ".txt" # @param [".txt", ".caption"]
# @markdown `(Solo se aplica al método Anime)` Especifique las etiquetas que no desea que utilice el etiquetador automático. Separe cada uno con una coma `(,)` como esta: **1girl, 1boy, solo, standing, ...**)
blacklisted_tags = "bangs, breasts, multicolored hair, two-tone hair, gradient hair, virtual youtuber, parody, style parody, official alternate costume, official alternate hairstyle, official alternate hair length, alternate costume, alternate hairstyle, alternate hair length, alternate hair color" # @param {type: "string"}
# @markdown `(Solo se aplica al método Anime)` Especifique el nivel de confianza mínimo requerido para asignar una etiqueta a la imagen. Un umbral más bajo da como resultado que se asignen más etiquetas. El valor predeterminado recomendado para los etiquetadores v2 es 0,35 y para v3 es 0,25.)
threshold = 0.35 # @param {type: "number", min:0.0, max: 1.0, step:0.01}
# @markdown `(Solo se aplica al método fotorrealista)` Especifique la cantidad mínima de palabras (también conocidas como tokens) para incluir en los subtítulos.)
caption_min = 10 # @param {type: "number"}
# @markdown `(Solo se aplica al método fotorrealista)` Especifique la cantidad máxima de palabras (también conocidas como tokens) para incluir en los subtítulos.)
caption_max = 75 # @param {type: "number"}

blacklisted_tags = blacklisted_tags.replace(" ", "")

def caption_images():
  global use_onnx_runtime

  if not globals().get("second_step_done"):
    print("You didn't complete the second step!")
    return

  dataset_dir = root_path.joinpath(project_path, dataset_dir_name)
  if Path(drive_dir).exists():
    dataset_dir = drive_dir.joinpath(project_path, dataset_dir_name)

  sd_scripts = trainer_dir.joinpath("sd_scripts")
  if not globals().get("first_step_done"):
    print("Please run the step 1 first.")
    return

  if not globals().get("tagger_dependencies"):
    print("Installing missing dependencies...")
    !{venv_pip} install fairscale==0.4.13 timm==0.6.12
    !{venv_pip} install onnxruntime-gpu==1.17.1 --extra-index-url https://aiinfra.pkgs.visualstudio.com/PublicPackages/_packaging/onnxruntime-cuda-12/pypi/simple/
    globals().setdefault("tagger_dependencies", True)

  batch_size = 8 if "v3" in model or "swinv2" in model else 1

  model_dir = tagger_models_dir.joinpath(model.split("/")[-1])

  print("Tagging images")

  if method == "Anime":
    !{venv_python} {wd_path} \
      {dataset_dir} \
      --repo_id={model} \
      --model_dir={model_dir} \
      --thresh={threshold} \
      --batch_size={batch_size} \
      --max_data_loader_n_workers=2 \
      --caption_extension={file_extension} \
      --undesired_tags={blacklisted_tags} \
      --remove_underscore \
      --onnx
  else:
    os.chdir(sd_scripts)
    !{venv_python} finetune/make_captions.py \
      {dataset_dir} \
      --beam_search \
      --max_data_loader_n_workers=2 \
      --batch_size=8 \
      --min_length={caption_min} \
      --max_length={caption_max} \
      --caption_extension=.txt
    os.chdir(root_path)

  print("Tagging complete!")

caption_images()

In [None]:
if "step1_installed_flag" not in globals():
  raise Exception("Please run step 1 first!")

#@markdown ### 5️⃣ Curate your tags
#@markdown Modify your dataset's tags. You can run this cell multiple times with different parameters. <p>

#@markdown Put an activation tag at the start of every text file. This is useful to make learning better and activate your Lora easier. Set `keep_tokens` to 1 when training.<p>
#@markdown Common tags that are removed such as hair color, etc. will be "absorbed" by your activation tag.
global_activation_tag = "" #@param {type:"string"}
remove_tags = "" #@param {type:"string"}
#@markdown &nbsp;

#@markdown In this advanced section, you can search text files containing matching tags, and replace them with less/more/different tags. If you select the checkbox below, any extra tags will be put at the start of the file, letting you assign different activation tags to different parts of your dataset. Still, you may want a more advanced tool for this.
search_tags = "" #@param {type:"string"}
replace_with = "" #@param {type:"string"}
search_mode = "OR" #@param ["OR", "AND"]
new_becomes_activation_tag = False #@param {type:"boolean"}
#@markdown These may be useful sometimes. Will remove existing activation tags, be careful.
sort_alphabetically = False #@param {type:"boolean"}
remove_duplicates = False #@param {type:"boolean"}

def split_tags(tagstr):
  return [s.strip() for s in tagstr.split(",") if s.strip()]

activation_tag_list = split_tags(global_activation_tag)
remove_tags_list = split_tags(remove_tags)
search_tags_list = split_tags(search_tags)
replace_with_list = split_tags(replace_with)
replace_new_list = [t for t in replace_with_list if t not in search_tags_list]

replace_with_list = [t for t in replace_with_list if t not in replace_new_list]
replace_new_list.reverse()
activation_tag_list.reverse()

remove_count = 0
replace_count = 0

for txt in [f for f in os.listdir(images_folder) if f.lower().endswith(".txt")]:

  with open(os.path.join(images_folder, txt), 'r') as f:
    tags = [s.strip() for s in f.read().split(",")]

  if remove_duplicates:
    tags = list(set(tags))
  if sort_alphabetically:
    tags.sort()

  for rem in remove_tags_list:
    if rem in tags:
      remove_count += 1
      tags.remove(rem)

  if "AND" in search_mode and all(r in tags for r in search_tags_list) \
      or "OR" in search_mode and any(r in tags for r in search_tags_list):
    replace_count += 1
    for rem in search_tags_list:
      if rem in tags:
        tags.remove(rem)
    for add in replace_with_list:
      if add not in tags:
        tags.append(add)
    for new in replace_new_list:
      if new_becomes_activation_tag:
        if new in tags:
          tags.remove(new)
        tags.insert(0, new)
      else:
        if new not in tags:
          tags.append(new)

  for act in activation_tag_list:
    if act in tags:
      tags.remove(act)
    tags.insert(0, act)

  with open(os.path.join(images_folder, txt), 'w') as f:
    f.write(", ".join(tags))

if global_activation_tag:
  print(f"\n📎 Applied new activation tag(s): {', '.join(activation_tag_list)}")
if remove_tags:
  print(f"\n🚮 Removed {remove_count} tags.")
if search_tags:
  print(f"\n💫 Replaced in {replace_count} files.")
print("\n✅ Done! Check your updated tags in the Extras below.")

In [None]:
#@markdown ### 6️⃣ Ready
#@markdown You should be ready to [train your Lora](https://colab.research.google.com/github/hollowstrawberry/kohya-colab/blob/main/Lora_Trainer.ipynb)!

from IPython.display import Markdown, display
display(Markdown(f"### 🦀 [Click here to open the Lora trainer](https://colab.research.google.com/github/hollowstrawberry/kohya-colab/blob/main/Lora_Trainer.ipynb)"))

## *️⃣ Extras

In [None]:
if "step1_installed_flag" not in globals():
  raise Exception("Please run step 1 first!")

#@markdown ### 📈 Analyze Tags
#@markdown Perhaps you need another look at your dataset.
show_top_tags = 50 #@param {type:"number"}

from collections import Counter
top_tags = Counter()

for txt in [f for f in os.listdir(images_folder) if f.lower().endswith(".txt")]:
  with open(os.path.join(images_folder, txt), 'r') as f:
    top_tags.update([s.strip() for s in f.read().split(",")])

top_tags = Counter(top_tags)
print(f"📊 Top {show_top_tags} tags:")
for k, v in top_tags.most_common(show_top_tags):
  print(f"{k} ({v})")

In [None]:
#@markdown ### 📂 Unzip dataset
#@markdown It's much slower to upload individual files to your Drive, so you may want to upload a zip if you have your dataset in your computer.
zip = "/content/drive/MyDrive/Loras/example.zip" #@param {type:"string"}
extract_to = "/content/drive/MyDrive/Loras/example/dataset" #@param {type:"string"}

import os, zipfile

if not os.path.exists('/content/drive'):
  from google.colab import drive
  print("📂 Connecting to Google Drive...")
  drive.mount('/content/drive')

os.makedirs(extract_to, exist_ok=True)

with zipfile.ZipFile(zip, 'r') as f:
  f.extractall(extract_to)

print("✅ Done")

In [None]:
#@markdown ### 🔢 Count datasets
#@markdown Google Drive makes it impossible to count the files in a folder, so this will show you the file counts in all folders and subfolders.
folder = "/content/drive/MyDrive/Loras" #@param {type:"string"}

import os
from google.colab import drive

if not os.path.exists('/content/drive'):
    print("📂 Connecting to Google Drive...\n")
    drive.mount('/content/drive')

tree = {}
exclude = ("_logs", "/output")
for i, (root, dirs, files) in enumerate(os.walk(folder, topdown=True)):
  dirs[:] = [d for d in dirs if all(ex not in d for ex in exclude)]
  images = len([f for f in files if f.lower().endswith((".png", ".jpg", ".jpeg"))])
  captions = len([f for f in files if f.lower().endswith(".txt")])
  others = len(files) - images - captions
  path = root[folder.rfind("/")+1:]
  tree[path] = None if not images else f"{images:>4} images | {captions:>4} captions |"
  if tree[path] and others:
    tree[path] += f" {others:>4} other files"

pad = max(len(k) for k in tree)
print("\n".join(f"📁{k.ljust(pad)} | {v}" for k, v in tree.items() if v))

In [None]:
if "step1_installed_flag" not in globals():
  raise Exception("Please run step 1 first!")

from PIL import Image
import os
Image.MAX_IMAGE_PIXELS = None

#@markdown ### 🖼️ Reduce dataset filesize
#@markdown This will convert all images in the project folder to jpeg, reducing filesize without affecting quality too much. This can also solve some errors.
location = images_folder

for dir in [d[0] for d in os.walk(location)]:
    os.chdir(dir)
    converted = False
    for file_name in list(os.listdir(".")):
        try:
            # Convert png to jpeg
            if file_name.endswith(".png"):
                if not converted:
                    print(f"Converting {dir}")
                    converted = True
                im = Image.open(file_name)
                im = im.convert("RGB")
                new_file_name = os.path.splitext(file_name)[0] + ".jpeg"
                im.save(new_file_name, quality=95)
                os.remove(file_name)
                file_name = new_file_name
            # Resize large jpegs
            if file_name.endswith((".jpeg", ".jpg")) and os.path.getsize(file_name) > 2000000:
                if not converted:
                    print(f"Converting {dir}")
                    converted = True
                im = Image.open(file_name)
                im = im.resize((int(im.width/2), int(im.height/2)))
                im.save(file_name, quality=95)
            # Rename jpg to jpeg
            if file_name.endswith(".jpg"):
                if not converted:
                    print(f"Converting {dir}")
                new_file_name = os.path.splitext(file_name)[0] + ".jpeg"
                os.rename(file_name, new_file_name)
        except Exception as e:
            print(f"An error occurred while processing {file_name}: {e}")
    if converted:
        print(f"Converted {dir}")


In [None]:
if "step1_installed_flag" not in globals():
  raise Exception("Please run step 1 first!")

#@markdown ### 🚮 Clean folder
#@markdown Careful! Deletes all non-image files in the project folder.

!find {images_folder} -type f ! \( -name '*.png' -o -name '*.jpg' -o -name '*.jpeg' \) -delete