<a href="https://colab.research.google.com/github/KitsunekoFi/ArielACE/blob/main/Data_Preparation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Reserval Data Preparation**
A Colab Notebook For SDXL LoRA Training (Fine-tuning Method)

[visitor-badge]: https://api.visitorbadge.io/api/visitors?path=Kohya%20LoRA%20Trainer%20XL&label=Visitors&labelColor=%2334495E&countColor=%231ABC9C&style=flat&labelStyle=none
[visitor-stats]: https://visitorbadge.io/status?path=Kohya%20LoRA%20Trainer%20XL
[ko-fi-badge]: https://img.shields.io/badge/Support%20me%20on%20Ko--fi-F16061?logo=ko-fi&logoColor=white&style=flat
[ko-fi-link]: https://ko-fi.com/linaqruf

In [None]:
# @title ## **1.1. Install Kohya Trainer**
import os
import zipfile
import shutil
import time
import requests
import torch
from subprocess import getoutput
from IPython.utils import capture
from google.colab import drive

%store -r

# root_dir
root_dir          = "/content"
drive_dir         = os.path.join(root_dir, "drive/MyDrive")
deps_dir          = os.path.join(root_dir, "deps")
repo_dir          = os.path.join(root_dir, "kohya-trainer")
training_dir      = os.path.join(root_dir, "LoRA")
pretrained_model  = os.path.join(root_dir, "pretrained_model")
vae_dir           = os.path.join(root_dir, "vae")
lora_dir          = os.path.join(root_dir, "network_weight")
repositories_dir  = os.path.join(root_dir, "repositories")
config_dir        = os.path.join(training_dir, "config")
tools_dir         = os.path.join(repo_dir, "tools")
finetune_dir      = os.path.join(repo_dir, "finetune")
accelerate_config = os.path.join(repo_dir, "accelerate_config/config.yaml")

for store in ["root_dir", "repo_dir", "training_dir", "pretrained_model", "vae_dir", "repositories_dir", "accelerate_config", "tools_dir", "finetune_dir", "config_dir"]:
    with capture.capture_output() as cap:
        %store {store}
        del cap

repo_dict = {
    "qaneel/kohya-trainer (forked repo, stable, optimized for colab use)" : "https://github.com/qaneel/kohya-trainer",
    "kohya-ss/sd-scripts (original repo, latest update)"                    : "https://github.com/kohya-ss/sd-scripts",
}

repository        = "qaneel/kohya-trainer (forked repo, stable, optimized for colab use)" #@param ["qaneel/kohya-trainer (forked repo, stable, optimized for colab use)", "kohya-ss/sd-scripts (original repo, latest update)"] {allow-input: true}
repo_url          = repo_dict[repository]
branch            = "main"  # @param {type: "string"}
output_to_drive   = False  # @param {type: "boolean"}

def clone_repo(url, dir, branch):
    if not os.path.exists(dir):
       !git clone -b {branch} {url} {dir}

def mount_drive(dir):
    output_dir      = os.path.join(training_dir, "output")

    if output_to_drive:
        if not os.path.exists(drive_dir):
            drive.mount(os.path.dirname(drive_dir))
        output_dir  = os.path.join(drive_dir, "kohya-trainer/output")

    return output_dir

def setup_directories():
    global output_dir

    output_dir      = mount_drive(drive_dir)

    for dir in [training_dir, config_dir, pretrained_model, vae_dir, repositories_dir, output_dir]:
        os.makedirs(dir, exist_ok=True)

def pastebin_reader(id):
    if "pastebin.com" in id:
        url = id
        if 'raw' not in url:
                url = url.replace('pastebin.com', 'pastebin.com/raw')
    else:
        url = "https://pastebin.com/raw/" + id
    response = requests.get(url)
    response.raise_for_status()
    lines = response.text.split('\n')
    return lines

def install_repository():
    global infinite_image_browser_dir, voldy, discordia_archivum_dir

    _, voldy = pastebin_reader("kq6ZmHFU")[:2]

    infinite_image_browser_url  = f"https://github.com/zanllp/{voldy}-infinite-image-browsing.git"
    infinite_image_browser_dir  = os.path.join(repositories_dir, f"infinite-image-browsing")
    infinite_image_browser_deps = os.path.join(infinite_image_browser_dir, "requirements.txt")

    discordia_archivum_url = "https://github.com/Linaqruf/discordia-archivum"
    discordia_archivum_dir = os.path.join(repositories_dir, "discordia-archivum")
    discordia_archivum_deps = os.path.join(discordia_archivum_dir, "requirements.txt")

    clone_repo(infinite_image_browser_url, infinite_image_browser_dir, "main")
    clone_repo(discordia_archivum_url, discordia_archivum_dir, "main")

    !pip install -q --upgrade -r {infinite_image_browser_deps}
    !pip install python-dotenv
    !pip install -q --upgrade -r {discordia_archivum_deps}

def install_dependencies():
    requirements_file = os.path.join(repo_dir, "requirements.txt")
    model_util        = os.path.join(repo_dir, "library/model_util.py")
    gpu_info          = getoutput('nvidia-smi')
    t4_xformers_wheel = "https://github.com/Linaqruf/colab-xformers/releases/download/0.0.20/xformers-0.0.20+1d635e1.d20230519-cp310-cp310-linux_x86_64.whl"

    !apt install aria2 lz4
    !wget https://github.com/camenduru/gperftools/releases/download/v1.0/libtcmalloc_minimal.so.4 -O /content/libtcmalloc_minimal.so.4
    !pip install -q --upgrade -r {requirements_file}

    if '2.0.1+cu118' in torch.__version__:
        if 'T4' in gpu_info:
            !pip install -q {t4_xformers_wheel}
        else:
            !pip install -q xformers==0.0.20
    else:
        !pip install -q torch==2.0.0+cu118 torchvision==0.15.1+cu118 torchaudio==2.0.1+cu118 torchtext==0.15.1 torchdata==0.6.0 --extra-index-url https://download.pytorch.org/whl/cu118 -U
        !pip install -q xformers==0.0.19 triton==2.0.0 -U

    from accelerate.utils import write_basic_config

    if not os.path.exists(accelerate_config):
        write_basic_config(save_location=accelerate_config)

def prepare_environment():
    os.environ["LD_PRELOAD"] = "/content/libtcmalloc_minimal.so.4"
    os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
    os.environ["SAFETENSORS_FAST_GPU"] = "1"
    os.environ["PYTHONWARNINGS"] = "ignore"

def main():
    os.chdir(root_dir)
    clone_repo(repo_url, repo_dir, branch)
    os.chdir(repo_dir)
    setup_directories()
    install_repository()
    install_dependencies()
    prepare_environment()

main()

In [None]:
#@title **Download Pretrained Model**
import os

download_link     = "https://huggingface.co/Lykon/AnyLoRA/resolve/main/AnyLoRA_noVae_fp16-pruned.safetensors" #@param {type:"string"}
folder_path       = "/content/pretrained_model" #@param {type:"string"}
file_name = download_link.split("/")[-1]
vae_path = os.path.join(folder_path, file_name)

!wget {download_link} -P {folder_path}
print("Path variabel vae_path:", vae_path)

In [None]:
# @title ## **1.3. Directory Config**
# @markdown Specify the location of your training data in the following cell. A folder with the same name as your input will be created.
import os

%store -r

train_data_dir = "/content/LoRA/train_data"  # @param {'type' : 'string'}
main_folder = train_data_dir

%store train_data_dir
%store main_folder

os.makedirs(train_data_dir, exist_ok=True)
print(f"Your train data directory : {train_data_dir}")

In [None]:
# @title ## **1.3. Directory Config**
# @markdown Specify the location of your training data in the following cell. A folder with the same name as your input will be created.
import os

%store -r

part_data = "Part1"  # @param {'type' : 'string'}
%store part_data

print(f"Your train data directory : {part_data}")

In [None]:
# @title ## **2.1. Unzip Dataset**

import os
import subprocess
import re
from urllib.parse import unquote
import requests
from zipfile import ZipFile

# @title ## Unzip Dataset
# @markdown If your dataset is in a `zip` file and has been uploaded to a location, use this section to extract it.
# @markdown The dataset will be downloaded and automatically extracted to `train_data_dir` if `unzip_to` is empty.

zipfile_url = "https://huggingface.co/datasets/Alterneko/a/resolve/main/processed/v2/Part[x]_v2.zip"  # @param {type:"string"}
unzip_to = ""  # @param {type:"string"}
hf_token = "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"

use_aria2c = True # @param {type:"boolean"}
preserve_folders = False # @param {type:"boolean"}
remove_after_unzipping = True # @param {type:"boolean"}

if "huggingface.co" in zipfile_url and "blob" in zipfile_url:
    zipfile_url = zipfile_url.replace("blob", "resolve")

if not unzip_to:
    unzip_to = train_data_dir

def get_filename_from_url(url):
    if "huggingface.co" or "/content/" in url:
        return os.path.basename(url)

    response = requests.head(url, allow_redirects=True)
    cd = response.headers.get('content-disposition')
    if cd:
        fname = re.findall('filename=(.+)', cd)
        if len(fname) == 0:
            return "zipfile.zip"
        return unquote(fname[0])

    return "zipfile.zip"

def download_with_requests(url, output_path):
    print(f"Downloading {url} with requests...")
    response = requests.get(url, stream=True)
    with open(output_path, 'wb') as file:
        for chunk in response.iter_content(chunk_size=8192):
            file.write(chunk)
    print(f"Downloaded to {output_path}")
    return output_path

def download_with_aria2c(url, output_path):
    print(f"Downloading {url} with aria2c...")
    aria_args = {
        'console-log-level': 'error',
        'summary-interval': '10',
        'continue': 'true',
        'max-connection-per-server': '16',
        'min-split-size': '1M',
        'split': '16',
        'dir': os.path.dirname(output_path),
        'out': os.path.basename(output_path),
    }

    if "huggingface.co" in url:
        aria_args['header'] = f"Authorization: Bearer {hf_token}"

    cmd = ['aria2c'] + [f'--{k}={v}' for k, v in aria_args.items()] + [url]
    subprocess.run(cmd)
    print(f"Downloaded to {output_path}")
    return output_path

def move_files(train_dir):
    for filename in os.listdir(train_dir):
        file_path = os.path.join(train_dir, filename)
        if filename.startswith("meta_") and filename.endswith(".json"):
            if not os.path.exists(file_path):
                shutil.move(file_path, training_dir)
            else:
                os.remove(file_path)

def remove_empty_dirs(path):
    for dirpath, dirnames, files in os.walk(path, topdown=False):  # start from leaf folders
        for dirname in dirnames:
            full_dir_path = os.path.join(dirpath, dirname)
            if not os.listdir(full_dir_path):  # Check if directory is empty
                os.rmdir(full_dir_path)
                print(f"Removed empty directory: {full_dir_path}")

def extract_dataset(zip_file, output_path):
    with ZipFile(zip_file, 'r') as zip_ref:
        print(f"Extracting {zip_file} to {output_path}...")

        if not preserve_folders:  # If we do not want to preserve folder structure
            for member in zip_ref.namelist():
                # Extract only the file name, discard directory structure
                filename = os.path.basename(member)
                if filename:  # Check if file name is not empty (this skips directories)
                    zip_ref.extract(member, output_path)
                    source_path = os.path.join(output_path, member)
                    target_path = os.path.join(output_path, filename)
                    os.rename(source_path, target_path)

            remove_empty_dirs(output_path)

        else:
            zip_ref.extractall(output_path)

        print("Extraction completed!")

def download_dataset(url, output_path):
    if url.startswith("/content"):
        print(f"Using file at {url}")
        return url

    elif "drive.google.com" in url:
        print("Downloading from Google Drive...")
        cmd = ['gdown', '--id', url.split('/')[-2], '-O', output_path]
        subprocess.run(cmd)
        return output_path

    elif use_aria2c:
        return download_with_aria2c(url, output_path)

    else:
        return download_with_requests(url, output_path)

def main():
    zipfile_name = get_filename_from_url(zipfile_url)
    output_path = os.path.join(root_dir, zipfile_name)

    zip_file = download_dataset(zipfile_url, output_path)

    extract_dataset(zip_file, unzip_to)

    move_files(unzip_to)

    if remove_after_unzipping and "/content/drive" not in zip_file:
        os.remove(zip_file)
        print(f"Removed {zip_file}")

main()


In [None]:
# @title ## **3.1. Data Cleaning**
import os
import random
import concurrent.futures
from tqdm import tqdm
from PIL import Image

%store -r

os.chdir(root_dir)

test = os.listdir(train_data_dir)
#@markdown This section removes unsupported media types such as `.mp4`, `.webm`, and `.gif`, as well as any unnecessary files.
#@markdown To convert a transparent dataset with an alpha channel (RGBA) to RGB and give it a white background, set the `convert` parameter to `True`.
convert = True  # @param {type:"boolean"}
#@markdown Alternatively, you can give the background a `random_color` instead of white by checking the corresponding option.
random_color = False  # @param {type:"boolean"}
recursive = False

batch_size = 32
supported_types = [
    ".png",
    ".jpg",
    ".jpeg",
    ".webp",
    ".bmp",
    ".json",
]

background_colors = [
    (255, 255, 255),
    (0, 0, 0),
    (255, 0, 0),
    (0, 255, 0),
    (0, 0, 255),
    (255, 255, 0),
    (255, 0, 255),
    (0, 255, 255),
]

def clean_directory(directory):
    for item in os.listdir(directory):
        file_path = os.path.join(directory, item)
        if os.path.isfile(file_path):
            file_ext = os.path.splitext(item)[1]
            if file_ext not in supported_types:
#                print(f"Deleting file {item} from {directory}")
                os.remove(file_path)
        elif os.path.isdir(file_path) and recursive:
            clean_directory(file_path)

def process_image(image_path):
    img = Image.open(image_path)
    img_dir, image_name = os.path.split(image_path)

    if img.mode in ("RGBA", "LA"):
        if random_color:
            background_color = random.choice(background_colors)
        else:
            background_color = (255, 255, 255)
        bg = Image.new("RGB", img.size, background_color)
        bg.paste(img, mask=img.split()[-1])

        if image_name.endswith(".webp"):
            bg = bg.convert("RGB")
            new_image_path = os.path.join(img_dir, image_name.replace(".webp", ".jpg"))
            bg.save(new_image_path, "JPEG")
            os.remove(image_path)
            print(f" Converted image: {image_name} to {os.path.basename(new_image_path)}")
        else:
            bg.save(image_path, "PNG")
            print(f" Converted image: {image_name}")
    else:
        if image_name.endswith(".webp"):
            new_image_path = os.path.join(img_dir, image_name.replace(".webp", ".jpg"))
            img.save(new_image_path, "JPEG")
            os.remove(image_path)
            print(f" Converted image: {image_name} to {os.path.basename(new_image_path)}")
        else:
            img.save(image_path, "PNG")

def find_images(directory):
    images = []
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith(".png") or file.endswith(".webp"):
                images.append(os.path.join(root, file))
    return images

clean_directory(train_data_dir)
images = find_images(train_data_dir)
num_batches = len(images) // batch_size + 1

if convert:
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for i in tqdm(range(num_batches)):
            start = i * batch_size
            end = start + batch_size
            batch = images[start:end]
            executor.map(process_image, batch)

    print("All images have been converted")

In [None]:
#@title ## **Move Lowres Data**

import os
import shutil

def move_small_files(directory, max_size_kb, dest_folder):
    moved_count = 0
    if not os.path.exists(dest_folder):
       os.makedirs(dest_folder)
    for root, dirs, files in os.walk(directory):
        for file in files:
            file_path = os.path.join(root, file)
            file_size_kb = os.path.getsize(file_path) / 1024  # Convert to KB
            if file_size_kb < max_size_kb:
                new_location = os.path.join(dest_folder, file)
                shutil.move(file_path, new_location)
                moved_count += 1
    return moved_count

train_data_dir = "/content/LoRA/train_data" # @param {type:"string"}
max_file_size_kb = 50 # @param {type:"number"}
destination_folder = "/content/LoRA/lowres_Data" # @param {type:"string"}

total_moved = move_small_files(train_data_dir, max_file_size_kb, destination_folder)
print(f"Total files moved: {total_moved}")


In [None]:
#@title ## **Tagging for NSFW Filted**
import os

%cd /content/kohya-trainer
!wget https://huggingface.co/datasets/Alterneko/Reserval/resolve/main/waifu.py
use_gpu = True #@param {type:"boolean"}

!python waifu.py --img_dir {train_data_dir} --use_gpu

In [None]:
#@title ## **Split Dataset to Right Folder**

import os
import shutil

def organize_files(input_folder, output_folder):
    safe_folder = os.path.join(output_folder, "safe")
    suggestive_folder = os.path.join(output_folder, "suggestive")
    explicit_folder = os.path.join(output_folder, "explicit")

    # Membuat subfolder jika belum ada
    for folder in [safe_folder, suggestive_folder, explicit_folder]:
        os.makedirs(folder, exist_ok=True)

    # Mendapatkan daftar file dalam folder input
    files = os.listdir(input_folder)

    for file in files:
        if "safe" in file:
            shutil.move(os.path.join(input_folder, file), os.path.join(safe_folder, file))
        elif "suggestive" in file:
            shutil.move(os.path.join(input_folder, file), os.path.join(suggestive_folder, file))
        elif "explicit" in file:
            shutil.move(os.path.join(input_folder, file), os.path.join(explicit_folder, file))

if __name__ == "__main__":
    input_folder = train_data_dir   # Ganti dengan path folder input yang sesuai
    output_folder = "/content/filtered"  #@param {type:"string"}
    organize_files(input_folder, output_folder)
    print("Pemindahan file selesai.")


In [None]:
#@title ## **Rename Dataset with Random Name and Prefix**
%store -r
import os
import secrets

# Ganti nilai folder_to_rename dan prefix_to_add sesuai kebutuhan Anda
part_to_rename = "[x]" #@param {type:"string"}
type_to_rename = "explicit" #@param ["safe", "explicit", "suggestive"]
folder_to_rename = "/content/filtered/" + type_to_rename
prefix_to_add = "v3_" + part_data + "_" + type_to_rename

def generate_random_string(length):
    return secrets.token_hex(length // 2)  # Karena setiap byte akan diubah menjadi dua karakter hex

def rename_files_in_folder(folder_path, prefix):
    for filename in os.listdir(folder_path):
        if os.path.isfile(os.path.join(folder_path, filename)):
            file_extension = os.path.splitext(filename)[1]
            random_string = generate_random_string(10)
            new_filename = f"{prefix}_{random_string}{file_extension}"
            os.rename(os.path.join(folder_path, filename), os.path.join(folder_path, new_filename))

target_folder = folder_to_rename
prefix = prefix_to_add

rename_files_in_folder(target_folder, prefix)
print("Files renamed successfully.")



In [None]:
#@title ## **Rename Dataset with Random Name and Prefix**
%store -r
import os
import secrets

# Ganti nilai folder_to_rename dan prefix_to_add sesuai kebutuhan Anda
part_to_rename = "[x]" #@param {type:"string"}
type_to_rename = "safe" #@param ["safe", "explicit", "suggestive"]
folder_to_rename = "/content/filtered/" + type_to_rename
prefix_to_add = "v3_" + part_data + "_" + type_to_rename

def generate_random_string(length):
    return secrets.token_hex(length // 2)  # Karena setiap byte akan diubah menjadi dua karakter hex

def rename_files_in_folder(folder_path, prefix):
    for filename in os.listdir(folder_path):
        if os.path.isfile(os.path.join(folder_path, filename)):
            file_extension = os.path.splitext(filename)[1]
            random_string = generate_random_string(10)
            new_filename = f"{prefix}_{random_string}{file_extension}"
            os.rename(os.path.join(folder_path, filename), os.path.join(folder_path, new_filename))

target_folder = folder_to_rename
prefix = prefix_to_add

rename_files_in_folder(target_folder, prefix)
print("Files renamed successfully.")



In [None]:
#@title ## **Rename Dataset with Random Name and Prefix**
%store -r
import os
import secrets

# Ganti nilai folder_to_rename dan prefix_to_add sesuai kebutuhan Anda
part_to_rename = "[x]" #@param {type:"string"}
type_to_rename = "suggestive" #@param ["safe", "explicit", "suggestive"]
folder_to_rename = "/content/filtered/" + type_to_rename
prefix_to_add = "v3_" + part_data + "_" + type_to_rename

def generate_random_string(length):
    return secrets.token_hex(length // 2)  # Karena setiap byte akan diubah menjadi dua karakter hex

def rename_files_in_folder(folder_path, prefix):
    for filename in os.listdir(folder_path):
        if os.path.isfile(os.path.join(folder_path, filename)):
            file_extension = os.path.splitext(filename)[1]
            random_string = generate_random_string(10)
            new_filename = f"{prefix}_{random_string}{file_extension}"
            os.rename(os.path.join(folder_path, filename), os.path.join(folder_path, new_filename))

target_folder = folder_to_rename
prefix = prefix_to_add

rename_files_in_folder(target_folder, prefix)
print("Files renamed successfully.")



In [None]:
#@title ### **3.2.2. Waifu Diffusion 1.4 Tagger V2**
import os
%store -r

os.chdir(finetune_dir)

tagging_dir = "/content/filtered" #@param {type:"string"}
#@markdown [Waifu Diffusion 1.4 Tagger V2](https://huggingface.co/spaces/SmilingWolf/wd-v1-4-tags) is a Danbooru-styled image classification model developed by SmilingWolf. It can also be useful for general image tagging, for example, `1girl, solo, looking_at_viewer, short_hair, bangs, simple_background`.
model = "SmilingWolf/wd-v1-4-moat-tagger-v2" #@param ["SmilingWolf/wd-v1-4-moat-tagger-v2", "SmilingWolf/wd-v1-4-convnextv2-tagger-v2", "SmilingWolf/wd-v1-4-swinv2-tagger-v2", "SmilingWolf/wd-v1-4-convnext-tagger-v2", "SmilingWolf/wd-v1-4-vit-tagger-v2"]
#@markdown Separate `undesired_tags` with comma `(,)` if you want to remove multiple tags, e.g. `1girl,solo,smile`.
undesired_tags = "realistic,cosplay" #@param {type:'string'}
#@markdown Adjust `general_threshold` for pruning tags (less tags, less flexible). `character_threshold` is useful if you want to train with character tags, e.g. `hakurei reimu`.
general_threshold = 0.3 #@param {type:"slider", min:0, max:1, step:0.05}
character_threshold = 0.8 #@param {type:"slider", min:0, max:1, step:0.05}

config = {
    "_train_data_dir"           : tagging_dir,
    "batch_size"                : 8,
    "repo_id"                   : model,
    "recursive"                 : True,
    "remove_underscore"         : True,
    "general_threshold"         : general_threshold,
    "character_threshold"       : character_threshold,
    "caption_extension"         : ".txt",
    "max_data_loader_n_workers" : 2,
    "debug"                     : True,
    "undesired_tags"            : undesired_tags
}

args = ""
for k, v in config.items():
    if k.startswith("_"):
        args += f'"{v}" '
    elif isinstance(v, str):
        args += f'--{k}="{v}" '
    elif isinstance(v, bool) and v:
        args += f"--{k} "
    elif isinstance(v, float) and not isinstance(v, bool):
        args += f"--{k}={v} "
    elif isinstance(v, int) and not isinstance(v, bool):
        args += f"--{k}={v} "

final_args = f"python tag_images_by_wd14_tagger.py {args}"

os.chdir(finetune_dir)
!{final_args}

In [None]:
# @title ### **3.2.3. Custom Caption/Tag (FOR BACKUP, NOT USED)**
import os

%store -r

os.chdir(root_dir)

type_to_tag = "safe" #@param ["safe", "explicit", "suggestive"]
# @markdown Add or remove custom tags here.
extension   = ".txt"  # @param [".txt", ".caption"]
custom_tag  = ""  # @param {type:"string"}
# @markdown Use `sub_folder` option to specify a subfolder for multi-concept training.
# @markdown > Specify `--all` to process all subfolders/`recursive`
sub_folder  = "/content/filtered/" #@param {type: "string"}
# @markdown Enable this to append custom tags at the end of lines.
append      = False  # @param {type:"boolean"}
# @markdown Enable this if you want to remove captions/tags instead.
remove_tag  = False  # @param {type:"boolean"}
recursive   = False

if type_to_tag == "safe":
  custom_tag = "1girl"
  sub_folder = sub_folder + type_to_tag
elif type_to_tag == "explicit":
  custom_tag = "nsfw"
  sub_folder = sub_folder + type_to_tag
elif type_to_tag == "suggestive":
  custom_tag = "sexually suggestive"
  sub_folder = sub_folder + type_to_tag

if sub_folder == "":
    image_dir = train_data_dir
elif sub_folder == "--all":
    image_dir = train_data_dir
    recursive = True
elif sub_folder.startswith("/content"):
    image_dir = sub_folder
else:
    image_dir = os.path.join(train_data_dir, sub_folder)
    os.makedirs(image_dir, exist_ok=True)

def read_file(filename):
    with open(filename, "r") as f:
        contents = f.read()
    return contents

def write_file(filename, contents):
    with open(filename, "w") as f:
        f.write(contents)

def process_tags(filename, custom_tag, append, remove_tag):
    contents = read_file(filename)
    tags = [tag.strip() for tag in contents.split(',')]
    custom_tags = [tag.strip() for tag in custom_tag.split(',')]

    for custom_tag in custom_tags:
        custom_tag = custom_tag.replace("_", " ")
        if remove_tag:
            while custom_tag in tags:
                tags.remove(custom_tag)
        else:
            if custom_tag not in tags:
                if append:
                    tags.append(custom_tag)
                else:
                    tags.insert(0, custom_tag)

    contents = ', '.join(tags)
    write_file(filename, contents)

def process_directory(image_dir, tag, append, remove_tag, recursive):
    for filename in os.listdir(image_dir):
        file_path = os.path.join(image_dir, filename)

        if os.path.isdir(file_path) and recursive:
            process_directory(file_path, tag, append, remove_tag, recursive)
        elif filename.endswith(extension):
            process_tags(file_path, tag, append, remove_tag)

tag = custom_tag

if not any(
    [filename.endswith(extension) for filename in os.listdir(image_dir)]
):
    for filename in os.listdir(image_dir):
        if filename.endswith((".png", ".jpg", ".jpeg", ".webp", ".bmp")):
            open(
                os.path.join(image_dir, filename.split(".")[0] + extension),
                "w",
            ).close()

if custom_tag:
    process_directory(image_dir, tag, append, remove_tag, recursive)

In [None]:
# @title ### **3.2.3. Custom Caption/Tag (FOR BACKUP, NOT USED)**
import os

%store -r

os.chdir(root_dir)

type_to_tag = "explicit" #@param ["safe", "explicit", "suggestive"]
# @markdown Add or remove custom tags here.
extension   = ".txt"  # @param [".txt", ".caption"]
custom_tag  = ""  # @param {type:"string"}
# @markdown Use `sub_folder` option to specify a subfolder for multi-concept training.
# @markdown > Specify `--all` to process all subfolders/`recursive`
sub_folder  = "/content/filtered/" #@param {type: "string"}
# @markdown Enable this to append custom tags at the end of lines.
append      = False  # @param {type:"boolean"}
# @markdown Enable this if you want to remove captions/tags instead.
remove_tag  = False  # @param {type:"boolean"}
recursive   = False

if type_to_tag == "safe":
  custom_tag = "1girl"
  sub_folder = sub_folder + type_to_tag
elif type_to_tag == "explicit":
  custom_tag = "nsfw"
  sub_folder = sub_folder + type_to_tag
elif type_to_tag == "suggestive":
  custom_tag = "sexually suggestive"
  sub_folder = sub_folder + type_to_tag

if sub_folder == "":
    image_dir = train_data_dir
elif sub_folder == "--all":
    image_dir = train_data_dir
    recursive = True
elif sub_folder.startswith("/content"):
    image_dir = sub_folder
else:
    image_dir = os.path.join(train_data_dir, sub_folder)
    os.makedirs(image_dir, exist_ok=True)

def read_file(filename):
    with open(filename, "r") as f:
        contents = f.read()
    return contents

def write_file(filename, contents):
    with open(filename, "w") as f:
        f.write(contents)

def process_tags(filename, custom_tag, append, remove_tag):
    contents = read_file(filename)
    tags = [tag.strip() for tag in contents.split(',')]
    custom_tags = [tag.strip() for tag in custom_tag.split(',')]

    for custom_tag in custom_tags:
        custom_tag = custom_tag.replace("_", " ")
        if remove_tag:
            while custom_tag in tags:
                tags.remove(custom_tag)
        else:
            if custom_tag not in tags:
                if append:
                    tags.append(custom_tag)
                else:
                    tags.insert(0, custom_tag)

    contents = ', '.join(tags)
    write_file(filename, contents)

def process_directory(image_dir, tag, append, remove_tag, recursive):
    for filename in os.listdir(image_dir):
        file_path = os.path.join(image_dir, filename)

        if os.path.isdir(file_path) and recursive:
            process_directory(file_path, tag, append, remove_tag, recursive)
        elif filename.endswith(extension):
            process_tags(file_path, tag, append, remove_tag)

tag = custom_tag

if not any(
    [filename.endswith(extension) for filename in os.listdir(image_dir)]
):
    for filename in os.listdir(image_dir):
        if filename.endswith((".png", ".jpg", ".jpeg", ".webp", ".bmp")):
            open(
                os.path.join(image_dir, filename.split(".")[0] + extension),
                "w",
            ).close()

if custom_tag:
    process_directory(image_dir, tag, append, remove_tag, recursive)

In [None]:
# @title ### **3.2.3. Custom Caption/Tag (FOR BACKUP, NOT USED)**
import os

%store -r

os.chdir(root_dir)

type_to_tag = "suggestive" #@param ["safe", "explicit", "suggestive"]
# @markdown Add or remove custom tags here.
extension   = ".txt"  # @param [".txt", ".caption"]
custom_tag  = ""  # @param {type:"string"}
# @markdown Use `sub_folder` option to specify a subfolder for multi-concept training.
# @markdown > Specify `--all` to process all subfolders/`recursive`
sub_folder  = "/content/filtered/" #@param {type: "string"}
# @markdown Enable this to append custom tags at the end of lines.
append      = False  # @param {type:"boolean"}
# @markdown Enable this if you want to remove captions/tags instead.
remove_tag  = False  # @param {type:"boolean"}
recursive   = False

if type_to_tag == "safe":
  custom_tag = "1girl"
  sub_folder = sub_folder + type_to_tag
elif type_to_tag == "explicit":
  custom_tag = "nsfw"
  sub_folder = sub_folder + type_to_tag
elif type_to_tag == "suggestive":
  custom_tag = "sexually suggestive"
  sub_folder = sub_folder + type_to_tag

if sub_folder == "":
    image_dir = train_data_dir
elif sub_folder == "--all":
    image_dir = train_data_dir
    recursive = True
elif sub_folder.startswith("/content"):
    image_dir = sub_folder
else:
    image_dir = os.path.join(train_data_dir, sub_folder)
    os.makedirs(image_dir, exist_ok=True)

def read_file(filename):
    with open(filename, "r") as f:
        contents = f.read()
    return contents

def write_file(filename, contents):
    with open(filename, "w") as f:
        f.write(contents)

def process_tags(filename, custom_tag, append, remove_tag):
    contents = read_file(filename)
    tags = [tag.strip() for tag in contents.split(',')]
    custom_tags = [tag.strip() for tag in custom_tag.split(',')]

    for custom_tag in custom_tags:
        custom_tag = custom_tag.replace("_", " ")
        if remove_tag:
            while custom_tag in tags:
                tags.remove(custom_tag)
        else:
            if custom_tag not in tags:
                if append:
                    tags.append(custom_tag)
                else:
                    tags.insert(0, custom_tag)

    contents = ', '.join(tags)
    write_file(filename, contents)

def process_directory(image_dir, tag, append, remove_tag, recursive):
    for filename in os.listdir(image_dir):
        file_path = os.path.join(image_dir, filename)

        if os.path.isdir(file_path) and recursive:
            process_directory(file_path, tag, append, remove_tag, recursive)
        elif filename.endswith(extension):
            process_tags(file_path, tag, append, remove_tag)

tag = custom_tag

if not any(
    [filename.endswith(extension) for filename in os.listdir(image_dir)]
):
    for filename in os.listdir(image_dir):
        if filename.endswith((".png", ".jpg", ".jpeg", ".webp", ".bmp")):
            open(
                os.path.join(image_dir, filename.split(".")[0] + extension),
                "w",
            ).close()

if custom_tag:
    process_directory(image_dir, tag, append, remove_tag, recursive)

In [None]:
# @title ## **3.4. Bucketing and Latents Caching**
%store -r

data_type = "safe" #@param ["explicit", "safe", "suggestive"]

%mkdir "/content/meta/"
meta_dir = "/content/meta/"

filtered_dir = "/content/filtered/" + data_type
# @markdown This code will create buckets based on the `bucket_resolution` provided for multi-aspect ratio training, and then convert all images within the `train_data_dir` to latents.
bucketing_json    = os.path.join(meta_dir, f"{data_type}_meta_lat.json")
metadata_json     = os.path.join(meta_dir, f"{data_type}_meta_clean.json")
bucket_resolution = 512  # @param {type:"slider", min:512, max:1024, step:128}
mixed_precision   = "no"  # @param ["no", "fp16", "bf16"] {allow-input: false}
skip_existing     = False  # @param{type:"boolean"}
flip_aug          = True  # @param{type:"boolean"}
# @markdown Use `clean_caption` option to clean such as duplicate tags, `women` to `girl`, etc
clean_caption     = True #@param {type:"boolean"}
#@markdown Use the `recursive` option to process subfolders as well
recursive         = False #@param {type:"boolean"}

metadata_config = {
    "_train_data_dir": filtered_dir,
    "_out_json": metadata_json,
    "recursive": recursive,
    "full_path": recursive,
    "clean_caption": clean_caption
}

bucketing_config = {
    "_train_data_dir": filtered_dir,
    "_in_json": metadata_json,
    "_out_json": bucketing_json,
    "_model_name_or_path": vae_path if vae_path else model_path,
    "recursive": recursive,
    "full_path": recursive,
    "flip_aug": flip_aug,
    "skip_existing": skip_existing,
    "batch_size": 4,
    "max_data_loader_n_workers": 2,
    "max_resolution": f"{bucket_resolution}, {bucket_resolution}",
    "mixed_precision": mixed_precision,
}

def generate_args(config):
    args = ""
    for k, v in config.items():
        if k.startswith("_"):
            args += f'"{v}" '
        elif isinstance(v, str):
            args += f'--{k}="{v}" '
        elif isinstance(v, bool) and v:
            args += f"--{k} "
        elif isinstance(v, float) and not isinstance(v, bool):
            args += f"--{k}={v} "
        elif isinstance(v, int) and not isinstance(v, bool):
            args += f"--{k}={v} "
    return args.strip()

merge_metadata_args = generate_args(metadata_config)
prepare_buckets_args = generate_args(bucketing_config)

merge_metadata_command = f"python merge_all_to_metadata.py {merge_metadata_args}"
prepare_buckets_command = f"python prepare_buckets_latents.py {prepare_buckets_args}"

os.chdir(finetune_dir)
!{merge_metadata_command}
time.sleep(1)
!{prepare_buckets_command}


In [None]:
#@title Zip Safe Dataset

import os
import zipfile

zip_type = "safe" #@param ["explicit", "safe", "suggestive"]
source_folder = "/content/filtered/" #@param {type:"string"}
source_folder_to_zip = source_folder + zip_type
output_dir = "/content/LoRA" #@param {type:"string"}
output_name = zip_type
#zip_part = "[x]" #@param {type:"string"}

if output_name:
  output_name = part_data + "_" + zip_type
else:
  output_name = prefix_to_add

output_path_for_zip = f'{output_dir}/{output_name}.zip'
safe_dataset_zip = output_path_for_zip

def zip_folder_contents(folder_path, output_zip_path):
    with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, _, files in os.walk(folder_path):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, folder_path)
                zipf.write(file_path, arcname)

if __name__ == "__main__":
    source_folder = source_folder_to_zip
    output_zip = output_path_for_zip

    zip_folder_contents(source_folder, output_zip)
    print("Folder contents compressed successfully.")


In [None]:
#@title Push Safe Zip to Huggingface

import os
from huggingface_hub import upload_file, create_repo

file_path = "" #@param {type:"string"}
prefix = "" #@param {type:"string"}

hf_token = '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' #@param {type:"string"}
repo_id = 'Alterneko/a' #@param {type:"string"}
repo_type = 'dataset' #@param ['model','dataset']
commit_message = 'Upload' #@param {type:"string"}
make_private = False #@param {type:"boolean"}

version_data = "v3" #@param {type:"string"}
zip_type = "processed" #@param ['raw','processed','bucket']

if file_path == "":
  file_path = safe_dataset_zip
else:
  file_path = file_path

if commit_message:
  commit_message = commit_message
else:
  commit_message = f'Push {filename}'

if version_data:
  version = f'{version_data}/'
else:
  version = ""

model_path = file_path
filename = os.path.basename(model_path)
final_filename = f'{filename}'
path_in_repo = f'{zip_type}/{version}{final_filename}'

create_repo(repo_id=repo_id, repo_type=repo_type, token=hf_token, exist_ok=True, private=make_private)
upload_file(path_or_fileobj=model_path, path_in_repo=path_in_repo, repo_id=repo_id, repo_type=repo_type, commit_message=commit_message, token=hf_token)


In [None]:
#@title Push to HF.co

import os
from huggingface_hub import upload_file, create_repo

file_path = "/content/meta/safe_meta_lat.json" #@param {type:"string"}
hf_token = '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' #@param {type:"string"}
repo_id = 'Alterneko/a' #@param {type:"string"}
repo_type = 'dataset' #@param ['model','dataset']
commit_message = 'Update' #@param {type:"string"}
make_private = False #@param {type:"boolean"}

version_data = "v3" #@param {type:"string"}
zip_type = "bucket" #@param ['raw','processed','bucket']

if commit_message:
  commit_message = commit_message
else:
  commit_message = f'Push {filename}'

if version_data:
  version = f'{version_data}/'
else:
  version = ""

model_path = file_path
filename = os.path.basename(model_path)
final_filename = f'{part_data}_{filename}'
path_in_repo = f'{zip_type}/{version}{final_filename}'

create_repo(repo_id=repo_id, repo_type=repo_type, token=hf_token, exist_ok=True, private=make_private)
upload_file(path_or_fileobj=model_path, path_in_repo=path_in_repo, repo_id=repo_id, repo_type=repo_type, commit_message=commit_message, token=hf_token)


In [None]:
# @title ## **3.4. Bucketing and Latents Caching**
%store -r

data_type = "explicit" #@param ["explicit", "safe", "suggestive"]

%mkdir "/content/meta/"
meta_dir = "/content/meta/"

filtered_dir = "/content/filtered/" + data_type
# @markdown This code will create buckets based on the `bucket_resolution` provided for multi-aspect ratio training, and then convert all images within the `train_data_dir` to latents.
bucketing_json    = os.path.join(meta_dir, f"{data_type}_meta_lat.json")
metadata_json     = os.path.join(meta_dir, f"{data_type}_meta_clean.json")
bucket_resolution = 512  # @param {type:"slider", min:512, max:1024, step:128}
mixed_precision   = "no"  # @param ["no", "fp16", "bf16"] {allow-input: false}
skip_existing     = False  # @param{type:"boolean"}
flip_aug          = True  # @param{type:"boolean"}
# @markdown Use `clean_caption` option to clean such as duplicate tags, `women` to `girl`, etc
clean_caption     = True #@param {type:"boolean"}
#@markdown Use the `recursive` option to process subfolders as well
recursive         = False #@param {type:"boolean"}

metadata_config = {
    "_train_data_dir": filtered_dir,
    "_out_json": metadata_json,
    "recursive": recursive,
    "full_path": recursive,
    "clean_caption": clean_caption
}

bucketing_config = {
    "_train_data_dir": filtered_dir,
    "_in_json": metadata_json,
    "_out_json": bucketing_json,
    "_model_name_or_path": vae_path if vae_path else model_path,
    "recursive": recursive,
    "full_path": recursive,
    "flip_aug": flip_aug,
    "skip_existing": skip_existing,
    "batch_size": 4,
    "max_data_loader_n_workers": 2,
    "max_resolution": f"{bucket_resolution}, {bucket_resolution}",
    "mixed_precision": mixed_precision,
}

def generate_args(config):
    args = ""
    for k, v in config.items():
        if k.startswith("_"):
            args += f'"{v}" '
        elif isinstance(v, str):
            args += f'--{k}="{v}" '
        elif isinstance(v, bool) and v:
            args += f"--{k} "
        elif isinstance(v, float) and not isinstance(v, bool):
            args += f"--{k}={v} "
        elif isinstance(v, int) and not isinstance(v, bool):
            args += f"--{k}={v} "
    return args.strip()

merge_metadata_args = generate_args(metadata_config)
prepare_buckets_args = generate_args(bucketing_config)

merge_metadata_command = f"python merge_all_to_metadata.py {merge_metadata_args}"
prepare_buckets_command = f"python prepare_buckets_latents.py {prepare_buckets_args}"

os.chdir(finetune_dir)
!{merge_metadata_command}
time.sleep(1)
!{prepare_buckets_command}


In [None]:
#@title Zip Explicit Dataset

import os
import zipfile

zip_type = "explicit" #@param ["explicit", "safe", "suggestive"]
source_folder = "/content/filtered/" #@param {type:"string"}
source_folder_to_zip = source_folder + zip_type
output_dir = "/content/LoRA" #@param {type:"string"}
output_name = zip_type
#zip_part = "[x]" #@param {type:"string"}

if output_name:
  output_name = part_data + "_" + zip_type
else:
  output_name = prefix_to_add

output_path_for_zip = f'{output_dir}/{output_name}.zip'
explicit_dataset_zip = output_path_for_zip

def zip_folder_contents(folder_path, output_zip_path):
    with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, _, files in os.walk(folder_path):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, folder_path)
                zipf.write(file_path, arcname)

if __name__ == "__main__":
    source_folder = source_folder_to_zip
    output_zip = output_path_for_zip

    zip_folder_contents(source_folder, output_zip)
    print("Folder contents compressed successfully.")


In [None]:
#@title Push Explicit Zip to Huggingface

import os
from huggingface_hub import upload_file, create_repo

file_path = "" #@param {type:"string"}
prefix = "" #@param {type:"string"}

hf_token = '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' #@param {type:"string"}
repo_id = 'Alterneko/a' #@param {type:"string"}
repo_type = 'dataset' #@param ['model','dataset']
commit_message = 'Upload' #@param {type:"string"}
make_private = False #@param {type:"boolean"}

version_data = "v3" #@param {type:"string"}
zip_type = "processed" #@param ['raw','processed','bucket']

if file_path == "":
  file_path = explicit_dataset_zip
else:
  file_path = file_path

if commit_message:
  commit_message = commit_message
else:
  commit_message = f'Push {filename}'

if version_data:
  version = f'{version_data}/'
else:
  version = ""

model_path = file_path
filename = os.path.basename(model_path)
final_filename = f'{filename}'
path_in_repo = f'{zip_type}/{version}{final_filename}'

create_repo(repo_id=repo_id, repo_type=repo_type, token=hf_token, exist_ok=True, private=make_private)
upload_file(path_or_fileobj=model_path, path_in_repo=path_in_repo, repo_id=repo_id, repo_type=repo_type, commit_message=commit_message, token=hf_token)


In [None]:
#@title Push to HF.co

import os
from huggingface_hub import upload_file, create_repo

file_path = "/content/meta/explicit_meta_lat.json" #@param {type:"string"}
hf_token = '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' #@param {type:"string"}
repo_id = 'Alterneko/a' #@param {type:"string"}
repo_type = 'dataset' #@param ['model','dataset']
commit_message = 'Update' #@param {type:"string"}
make_private = False #@param {type:"boolean"}

version_data = "v3" #@param {type:"string"}
zip_type = "bucket" #@param ['raw','processed','bucket']

if commit_message:
  commit_message = commit_message
else:
  commit_message = f'Push {filename}'

if version_data:
  version = f'{version_data}/'
else:
  version = ""

model_path = file_path
filename = os.path.basename(model_path)
final_filename = f'{part_data}_{filename}'
path_in_repo = f'{zip_type}/{version}{final_filename}'

create_repo(repo_id=repo_id, repo_type=repo_type, token=hf_token, exist_ok=True, private=make_private)
upload_file(path_or_fileobj=model_path, path_in_repo=path_in_repo, repo_id=repo_id, repo_type=repo_type, commit_message=commit_message, token=hf_token)


In [None]:
# @title ## **3.4. Bucketing and Latents Caching**
%store -r

data_type = "suggestive" #@param ["explicit", "safe", "suggestive"]

%mkdir "/content/meta/"
meta_dir = "/content/meta/"

filtered_dir = "/content/filtered/" + data_type
# @markdown This code will create buckets based on the `bucket_resolution` provided for multi-aspect ratio training, and then convert all images within the `train_data_dir` to latents.
bucketing_json    = os.path.join(meta_dir, f"{data_type}_meta_lat.json")
metadata_json     = os.path.join(meta_dir, f"{data_type}_meta_clean.json")
bucket_resolution = 512  # @param {type:"slider", min:512, max:1024, step:128}
mixed_precision   = "no"  # @param ["no", "fp16", "bf16"] {allow-input: false}
skip_existing     = False  # @param{type:"boolean"}
flip_aug          = True  # @param{type:"boolean"}
# @markdown Use `clean_caption` option to clean such as duplicate tags, `women` to `girl`, etc
clean_caption     = True #@param {type:"boolean"}
#@markdown Use the `recursive` option to process subfolders as well
recursive         = False #@param {type:"boolean"}

metadata_config = {
    "_train_data_dir": filtered_dir,
    "_out_json": metadata_json,
    "recursive": recursive,
    "full_path": recursive,
    "clean_caption": clean_caption
}

bucketing_config = {
    "_train_data_dir": filtered_dir,
    "_in_json": metadata_json,
    "_out_json": bucketing_json,
    "_model_name_or_path": vae_path if vae_path else model_path,
    "recursive": recursive,
    "full_path": recursive,
    "flip_aug": flip_aug,
    "skip_existing": skip_existing,
    "batch_size": 4,
    "max_data_loader_n_workers": 2,
    "max_resolution": f"{bucket_resolution}, {bucket_resolution}",
    "mixed_precision": mixed_precision,
}

def generate_args(config):
    args = ""
    for k, v in config.items():
        if k.startswith("_"):
            args += f'"{v}" '
        elif isinstance(v, str):
            args += f'--{k}="{v}" '
        elif isinstance(v, bool) and v:
            args += f"--{k} "
        elif isinstance(v, float) and not isinstance(v, bool):
            args += f"--{k}={v} "
        elif isinstance(v, int) and not isinstance(v, bool):
            args += f"--{k}={v} "
    return args.strip()

merge_metadata_args = generate_args(metadata_config)
prepare_buckets_args = generate_args(bucketing_config)

merge_metadata_command = f"python merge_all_to_metadata.py {merge_metadata_args}"
prepare_buckets_command = f"python prepare_buckets_latents.py {prepare_buckets_args}"

os.chdir(finetune_dir)
!{merge_metadata_command}
time.sleep(1)
!{prepare_buckets_command}


In [None]:
#@title Zip Suggestive Dataset

import os
import zipfile

zip_type = "suggestive" #@param ["explicit", "safe", "suggestive"]
source_folder = "/content/filtered/" #@param {type:"string"}
source_folder_to_zip = source_folder + zip_type
output_dir = "/content/LoRA" #@param {type:"string"}
output_name = zip_type
zip_part = "[x]" #@param {type:"string"}

if output_name:
  output_name = part_data + "_" + zip_type
else:
  output_name = prefix_to_add

output_path_for_zip = f'{output_dir}/{output_name}.zip'
suggestive_dataset_zip = output_path_for_zip

def zip_folder_contents(folder_path, output_zip_path):
    with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, _, files in os.walk(folder_path):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, folder_path)
                zipf.write(file_path, arcname)

if __name__ == "__main__":
    source_folder = source_folder_to_zip
    output_zip = output_path_for_zip

    zip_folder_contents(source_folder, output_zip)
    print("Folder contents compressed successfully.")


In [None]:
#@title Push Suggestive Zip to Huggingface

import os
from huggingface_hub import upload_file, create_repo

file_path = "" #@param {type:"string"}
prefix = "" #@param {type:"string"}

hf_token = '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' #@param {type:"string"}
repo_id = 'Alterneko/a' #@param {type:"string"}
repo_type = 'dataset' #@param ['model','dataset']
commit_message = 'Upload' #@param {type:"string"}
make_private = False #@param {type:"boolean"}

version_data = "v3" #@param {type:"string"}
zip_type = "processed" #@param ['raw','processed','bucket']

if file_path == "":
  file_path = suggestive_dataset_zip
else:
  file_path = file_path

if commit_message:
  commit_message = commit_message
else:
  commit_message = f'Push {filename}'

if version_data:
  version = f'{version_data}/'
else:
  version = ""

model_path = file_path
filename = os.path.basename(model_path)
final_filename = f'{filename}'
path_in_repo = f'{zip_type}/{version}{final_filename}'

create_repo(repo_id=repo_id, repo_type=repo_type, token=hf_token, exist_ok=True, private=make_private)
upload_file(path_or_fileobj=model_path, path_in_repo=path_in_repo, repo_id=repo_id, repo_type=repo_type, commit_message=commit_message, token=hf_token)


In [None]:
#@title Push to HF.co

import os
from huggingface_hub import upload_file, create_repo

file_path = "/content/meta/suggestive_meta_lat.json" #@param {type:"string"}

hf_token = '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' #@param {type:"string"}
repo_id = 'Alterneko/a' #@param {type:"string"}
repo_type = 'dataset' #@param ['model','dataset']
commit_message = 'Update' #@param {type:"string"}
make_private = False #@param {type:"boolean"}

version_data = "v3" #@param {type:"string"}
zip_type = "bucket" #@param ['raw','processed','bucket']

if commit_message:
  commit_message = commit_message
else:
  commit_message = f'Push {filename}'

if version_data:
  version = f'{version_data}/'
else:
  version = ""

model_path = file_path
filename = os.path.basename(model_path)
final_filename = f'{part_data}_{filename}'
path_in_repo = f'{zip_type}/{version}{final_filename}'

create_repo(repo_id=repo_id, repo_type=repo_type, token=hf_token, exist_ok=True, private=make_private)
upload_file(path_or_fileobj=model_path, path_in_repo=path_in_repo, repo_id=repo_id, repo_type=repo_type, commit_message=commit_message, token=hf_token)


In [None]:
#@title Zip lowres Folder

import os
import zipfile

source_folder = "/content/LoRA/lowres_Data" #@param {type:"string"}
source_folder_to_zip = source_folder
output_dir = "/content/LoRA" #@param {type:"string"}
output_name = "lowres" #@param {type:"string"}
#zip_part = "[x]" #@param {type:"string"}

if output_name:
  output_name = output_name + "_" + part_data
else:
  output_name = prefix_to_add

output_path_for_zip = f'{output_dir}/{output_name}.zip'
lowres_dataset_zip = output_path_for_zip

def zip_folder_contents(folder_path, output_zip_path):
    with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, _, files in os.walk(folder_path):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, folder_path)
                zipf.write(file_path, arcname)

if __name__ == "__main__":
    source_folder = source_folder_to_zip
    output_zip = output_path_for_zip

    zip_folder_contents(source_folder, output_zip)
    print("Folder contents compressed successfully.")


In [None]:
#@title Push Lowres Zip to HF.co

import os
from huggingface_hub import upload_file, create_repo

file_path = "" #@param {type:"string"}

prefix = "" #@param {type:"string"}

hf_token = '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' #@param {type:"string"}
repo_id = 'Alterneko/a' #@param {type:"string"}
repo_type = 'dataset' #@param ['model','dataset']
commit_message = 'Upload' #@param {type:"string"}
make_private = False #@param {type:"boolean"}

version_data = "v3" #@param {type:"string"}
zip_type = "processed" #@param ['raw','processed','bucket']

if file_path == "":
  file_path = lowres_dataset_zip
else:
  file_path = file_path

if commit_message:
  commit_message = commit_message
else:
  commit_message = f'Push {filename}'

if version_data:
  version = f'{version_data}/'
else:
  version = ""

model_path = file_path
filename = os.path.basename(model_path)
final_filename = f'{prefix}_{filename}'
path_in_repo = f'{zip_type}/{version}{final_filename}'

create_repo(repo_id=repo_id, repo_type=repo_type, token=hf_token, exist_ok=True, private=make_private)
upload_file(path_or_fileobj=model_path, path_in_repo=path_in_repo, repo_id=repo_id, repo_type=repo_type, commit_message=commit_message, token=hf_token)


In [None]:
#@title Clear Folder

import os
import shutil

def clear_folder_contents(folder_path):
    for item in os.listdir(folder_path):
        item_path = os.path.join(folder_path, item)
        if os.path.isfile(item_path):
            os.unlink(item_path)
        elif os.path.isdir(item_path):
            shutil.rmtree(item_path)

# Ganti 'folder_path' dengan jalur folder yang ingin kamu hapus isinya
folder_path = '/content/LoRA/train_data' #@param {type:"string"}
clear_folder_contents(folder_path)
print(f"Isi folder {folder_path} telah dihapus.")
print(f"All Done!")
