<a href="https://colab.research.google.com/github/Linaqruf/sd-notebook-collection/blob/main/sd-colab-toolkit.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# I. Installation

In [None]:
#@title ## 1.1 Install Dependencies
#@markdown This will install required Python packages
import os
import zipfile
import shutil

root_dir = "/content"
repo_dir = os.path.join(root_dir, "sd-scripts")
models_dir = os.path.join(root_dir, "models")
vaes_dir = os.path.join(root_dir, "vae")
deps_dir = os.path.join(root_dir, "deps")
tools_dir = os.path.join(repo_dir, "tools")

ubuntu_deps_url = "https://huggingface.co/Linaqruf/fast-repo/resolve/main/ubuntu-deps.zip"
ram_patch = "https://huggingface.co/Linaqruf/fast-repo/resolve/main/ram_patch.zip"

def ubuntu_deps(url, dst):
    os.makedirs(dst, exist_ok=True)
    filename = os.path.basename(url)
    !wget -q --show-progress {url}
    with zipfile.ZipFile(filename, "r") as deps:
        deps.extractall(dst)
    !dpkg -i {dst}/*
    os.remove(filename)
    shutil.rmtree(dst)
    
def install_dependencies():
    os.chdir(repo_dir)
    !pip install --upgrade -r requirements.txt
    !pip install --upgrade --no-cache-dir gdown

def main():
    for dir in [models_dir, vaes_dir]:
        os.makedirs(dir, exist_ok=True)

    !apt -y update -qq
    !apt install libunwind8-dev -yqq
    
    for url in [ubuntu_deps_url, ram_patch]:
        ubuntu_deps(url, deps_dir)
    os.environ["LD_PRELOAD"] = "libtcmalloc.so"

    if not os.path.isdir(repo_dir):
      !git clone https://github.com/kohya-ss/sd-scripts

    install_dependencies()

main()


In [None]:
# @title ## 2.2. Download Custom Model
import os

%store -r

os.chdir(root_dir)

# @markdown Fill in the URL fields with the links to the files you want to download. Separate multiple URLs with a comma.
# @markdown Example: `url1, url2, url3`
modelUrls = ""  # @param {'type': 'string'}

def install(url):
    base_name = os.path.basename(url)

    if "drive.google.com" in url:
        os.chdir(models_dir)
        !gdown --fuzzy {url}
    elif "huggingface.co" in url:
        if "/blob/" in url:
            url = url.replace("/blob/", "/resolve/")
        # @markdown Change this part with your own huggingface token if you need to download your private model
        hf_token = "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"  # @param {type:"string"}
        user_header = f'"Authorization: Bearer {hf_token}"'
        !aria2c --console-log-level=error --summary-interval=10 --header={user_header} -c -x 16 -k 1M -s 16 -d {models_dir} -o {base_name} {url}
    else:
        !aria2c --console-log-level=error --summary-interval=10 -c -x 16 -k 1M -s 16 -d {models_dir} {url}

if modelUrls:
    urls = modelUrls.split(",")
    for url in urls:
        install(url.strip())


In [None]:
# @title ## 2.3. Download Available VAE (Optional)
import os

%store -r

os.chdir(root_dir)

vaes = {
    "none": "",
    "anime.vae.pt": "https://huggingface.co/Linaqruf/personal-backup/resolve/main/vae/animevae.pt",
    "waifudiffusion.vae.pt": "https://huggingface.co/hakurei/waifu-diffusion-v1-4/resolve/main/vae/kl-f8-anime.ckpt",
    "stablediffusion.vae.pt": "https://huggingface.co/stabilityai/sd-vae-ft-mse-original/resolve/main/vae-ft-mse-840000-ema-pruned.ckpt",
}
install_vaes = []

# @markdown Select one of the VAEs to download, select `none` for not download VAE:
vae_name = "anime.vae.pt"  # @param ["none", "anime.vae.pt", "waifudiffusion.vae.pt", "stablediffusion.vae.pt"]

if vae_name in vaes:
    vae_url = vaes[vae_name]
    if vae_url:
        install_vaes.append((vae_name, vae_url))

def install(vae_name, url):
    hf_token = "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"
    user_header = f'"Authorization: Bearer {hf_token}"'
    !aria2c --console-log-level=error --summary-interval=10 --header={user_header} -c -x 16 -k 1M -s 16 -d {vaes_dir} -o {vae_name} "{url}"

def install_vae():
    for vae in install_vaes:
        install(vae[0], vae[1])

install_vae()


# 2. Model Conversion

In [None]:
import os
%store -r
#@title ## 7.2. Model Pruner

os.chdir(tools_dir)

if not os.path.exists('prune.py'):
    !wget https://raw.githubusercontent.com/lopho/stable-diffusion-prune/main/prune.py

#@markdown Convert to Float16
fp16 = False #@param {'type':'boolean'}
#@markdown Use EMA for weights
ema = False #@param {'type':'boolean'}
#@markdown Strip CLIP weights
no_clip = False #@param {'type':'boolean'}
#@markdown Strip VAE weights
no_vae = False #@param {'type':'boolean'}
#@markdown Strip depth model weights
no_depth = False #@param {'type':'boolean'}
#@markdown Strip UNet weights
no_unet = False #@param {'type':'boolean'}

model_path = "" #@param {'type' : 'string'}

config = {
    "fp16": fp16,
    "ema": ema,
    "no_clip": no_clip,
    "no_vae": no_vae,
    "no_depth": no_depth,
    "no_unet": no_unet,
}

suffixes = {
    "fp16": "-fp16",
    "ema": "-ema",
    "no_clip": "-no-clip",
    "no_vae": "-no-vae",
    "no_depth": "-no-depth",
    "no_unet": "-no-unet",
}

print(f"Loading model from {model_path}")

dir_name = os.path.dirname(model_path)
base_name = os.path.basename(model_path)
output_name = base_name.split('.')[0]

for option, suffix in suffixes.items():
    if config[option]:
        print(f"Applying option {option}")
        output_name += suffix
        
output_name += '-pruned'
output_path = os.path.join(dir_name, output_name + ('.ckpt' if model_path.endswith(".ckpt") else ".safetensors"))

args = ""
for k, v in config.items():
    if k.startswith("_"):
        args += f'"{v}" '
    elif isinstance(v, str):
        args += f'--{k}="{v}" '
    elif isinstance(v, bool) and v:
        args += f"--{k} "
    elif isinstance(v, float) and not isinstance(v, bool):
        args += f"--{k}={v} "
    elif isinstance(v, int) and not isinstance(v, bool):
        args += f"--{k}={v} "

final_args = f"python3 prune.py {model_path} {output_path} {args}"
!{final_args}

print(f"Saving pruned model to {output_path}")

In [None]:
#@title ## 7.1. Convert Diffusers to Checkpoint
import os
%store -r

os.chdir(tools_dir)

#@markdown ### Conversion Config
model_to_load = "" #@param {'type': 'string'}
model_to_save = os.path.splitext(model_to_load)[0]
convert = "checkpoint_to_diffusers" #@param ["diffusers_to_checkpoint", "checkpoint_to_diffusers"] {'allow-input': false}
v2 = True #@param {type:'boolean'}
global_step = 0 #@param {'type': 'number'}
epoch = 0 #@param {'type': 'number'}
use_safetensors = True #@param {'type': 'boolean'}
save_precision_as = "--float" #@param ["--fp16","--bf16","--float"] {'allow-input': false}

#@markdown Additional option for diffusers
feature_extractor = True #@param {'type': 'boolean'}
safety_checker = True #@param {'type': 'boolean'}

reference_model = "stabilityai/stable-diffusion-2-1" if v2 else "runwayml/stable-diffusion-v1-5" 
model_output = f"{model_to_save}.safetensors" if use_safetensors else f"{model_to_save}.ckpt"

urls = [
    ("preprocessor_config.json", "https://huggingface.co/CompVis/stable-diffusion-safety-checker/resolve/main/preprocessor_config.json"),
    ("config.json", "https://huggingface.co/CompVis/stable-diffusion-safety-checker/resolve/main/config.json"),
    ("pytorch_model.bin", "https://huggingface.co/CompVis/stable-diffusion-safety-checker/resolve/main/pytorch_model.bin"),
]

diffusers_to_sd_dict = {
    "_model_to_load": model_to_load,
    "_model_to_save": model_output,
    "global_step": global_step,
    "epoch": epoch,
    "save_precision_as": save_precision_as,
}

sd_to_diffusers_dict = {
    "_model_to_load": model_to_load,
    "_model_to_save": model_to_save,
    "v2": True if v2 else False,
    "v1": True if not v2 else False,
    "global_step": global_step,
    "epoch": epoch,
    "fp16": True if save_precision_as == "fp16" else False,
    "use_safetensors": use_safetensors,
    "reference_model": reference_model
}

def convert_dict(config):
    args = ""
    for k, v in config.items():
        if k.startswith("_"):
            args += f'"{v}" '
        elif isinstance(v, str):
            args += f'--{k}="{v}" '
        elif isinstance(v, bool) and v:
            args += f"--{k} "
        elif isinstance(v, float) and not isinstance(v, bool):
            args += f"--{k}={v} "
        elif isinstance(v, int) and not isinstance(v, bool):
            args += f"--{k}={v} "

    return args

def run_script(script_name, script_args):
    !python {script_name} {script_args}

def download(output, url, save_dir):
    !aria2c --console-log-level=error --summary-interval=10 -c -x 16 -k 1M -s 16 -d '{save_dir}' -o '{output}' {url}

diffusers_to_sd_args = convert_dict(diffusers_to_sd_dict)
sd_to_diffusers_args = convert_dict(sd_to_diffusers_dict)

if convert == "diffusers_to_checkpoint":
    if model_to_load.endswith(("ckpt","safetensors")):
        print(f"{os.path.basename(model_to_load)} is not in diffusers format")
    else:
        run_script("convert_diffusers20_original_sd.py", diffusers_to_sd_args)
else:
    if not model_to_load.endswith(("ckpt","safetensors")):
        print(f"{os.path.basename(model_to_load)} is not in ckpt/safetensors format")
    else:     
        run_script("convert_diffusers20_original_sd.py", sd_to_diffusers_args)

        if feature_extractor:
            save_dir = os.path.join(model_to_save, "feature_extractor")
            os.makedirs(save_dir, exist_ok=True)
            output, url = urls[0]
            download(output, url, save_dir)
            
        if safety_checker:
            save_dir = os.path.join(model_to_save, "safety_checker")
            os.makedirs(save_dir, exist_ok=True)
            for output, url in urls[1:]:
                download(output, url, save_dir)

In [None]:
#@title 2.3. Replace VAE of Existing Model 

os.chdir(tools_dir)
if not os.path.exists('merge_vae.py'):
  !wget https://raw.githubusercontent.com/Linaqruf/kohya-trainer/main/tools/merge_vae.py

#@markdown You need to input model ends with `.ckpt`, because `.safetensors` model won't work.

target_model = "" #@param {'type': 'string'}
target_vae = "/content/vae/anime.vae.pt" #@param {'type': 'string'}
use_safetensors = False #@param {type:'boolean'}
# get the base file name and directory
base_name = os.path.basename(target_model)
base_dir = os.path.dirname(target_model)

# get the file name without extension
file_name = os.path.splitext(base_name)[0]

# create the new file name
new_file_name = file_name + "-vae-swapped"

# get the file extension
file_ext = os.path.splitext(base_name)[1]

# create the output file path
output_model = os.path.join(base_dir, new_file_name + file_ext)

!python merge_vae.py \
    {target_model} \
    {target_vae} \
    {output_model}



In [None]:
#@title 2.4. Convert CKPT-2-Safetensors

import os
import torch
from safetensors.torch import load_file, save_file
from torch import load, save

model_path = "" #@param {type: 'string'}

def is_safetensors(path):
  return os.path.splitext(path)[1].lower() == '.safetensors'

def convert(model_path):
  print("Loading model:", os.path.basename(model_path))
  
  try:
      with torch.no_grad():
          print("Conversion in progress, please wait...")
          if is_safetensors(model_path):
            model = load_file(model_path, device="cpu")
          else:
            model = load(model_path, map_location="cpu")
          
          if 'state_dict' in model:
            sd = model['state_dict']
          else:
            sd = model

          save_to = ".ckpt" if is_safetensors(model_path) else ".safetensors"
          output = os.path.splitext(model_path)[0] + save_to

          if is_safetensors(model_path):
            save(sd, output)
          else:
            save_file(sd, output)

      print(f'Successfully converted {os.path.basename(model_path)} to {os.path.basename(output)}')
      print(f'located in this path : {output}')
  except Exception as ex:
      print(f'ERROR converting {os.path.basename(model_path)}: {ex}')

  print('Done!')

def main():
  convert(model_path)
main()


# VIII. Deployment

In [None]:
# @title ## 7.1. Upload Config
from huggingface_hub import login
from huggingface_hub import HfApi
from huggingface_hub.utils import validate_repo_id, HfHubHTTPError

# @markdown Login to Huggingface Hub
# @markdown > Get **your** huggingface `WRITE` token [here](https://huggingface.co/settings/tokens)
write_token = ""  # @param {type:"string"}
# @markdown Fill this if you want to upload to your organization, or just leave it empty.
orgs_name = ""  # @param{type:"string"}
# @markdown If your model/dataset repo does not exist, it will automatically create it.
repo_name = "stolen"  # @param{type:"string"}
make_private = False  # @param{type:"boolean"}

def authenticate(write_token):
    login(write_token, add_to_git_credential=True)
    api = HfApi()
    return api.whoami(write_token), api

def create_repo(api, user, orgs_name, repo_name, make_private=False):
    if orgs_name == "":
        repo_id = user["name"] + "/" + repo_name.strip()
    else:
        repo_id = orgs_name + "/" + repo_name.strip()

    try:
        validate_repo_id(repo_id)
        api.create_repo(repo_id=repo_id, private=make_private)
        print(f"Model repo '{repo_id}' didn't exist, creating repo")
    except HfHubHTTPError as e:
        print(f"Model repo '{repo_id}' exists, skipping create repo")
    
    print(f"Model repo '{repo_id}' link: https://huggingface.co/{repo_id}\n")
    return repo_id

user, api = authenticate(write_token)

model_repo = create_repo(api, user, orgs_name, repo_name, make_private)


## 8.2. Upload with Huggingface Hub

In [None]:
#@title ### 8.2.1. Upload Model
from huggingface_hub import HfApi
from huggingface_hub.utils import validate_repo_id, HfHubHTTPError
from pathlib import Path

api = HfApi()

#@markdown This will be uploaded to model repo

model_path = "" #@param {type :"string"}
path_in_repo = "" #@param {type :"string"}
revision = "" #@param {type :"string"}
if revision:
  api.create_branch(repo_id=model_repo, 
                branch=revision, 
                exist_ok=True)
else:
  revision = "main"
project_name = os.path.basename(model_path)
if project_name in [".safetensors", "ckpt", "pt"]:
  project_name = os.path.split(model_path)[0]
# @markdown Other Information
commit_message = ""  # @param {type :"string"}

if not commit_message:
    commit_message = "feat: upload " + project_name + " checkpoint"

if os.path.exists(model_path):
  vae_exists = os.path.exists(os.path.join(model_path, 'vae'))
  unet_exists = os.path.exists(os.path.join(model_path, 'unet'))
  text_encoder_exists = os.path.exists(os.path.join(model_path, 'text_encoder'))
    
def upload_model(model_paths, is_folder :bool, commit_message):
  path_obj = Path(model_paths)
  trained_model = path_obj.parts[-1]
  
  if path_in_repo:
    trained_model = path_in_repo
    
  if is_folder == True:
    print(f"Uploading {trained_model} to https://huggingface.co/"+model_repo)
    print(f"Please wait...")

    if vae_exists and unet_exists and text_encoder_exists:
      if not commit_message:
        commit_message = f"feat: upload diffusers version of {trained_model}"

      api.upload_folder(
          folder_path=model_paths,
          repo_id=model_repo,
          revision=revision,
          commit_message=commit_message,
          ignore_patterns=".ipynb_checkpoints"
          )
    
    else:
      if not commit_message:
        commit_message = f"feat: upload {trained_model} checkpoint folder"

      api.upload_folder(
          folder_path=model_paths,
          path_in_repo=trained_model,
          repo_id=model_repo,
          revision=revision,
          commit_message=commit_message,
          ignore_patterns=".ipynb_checkpoints"
          )
    print(f"Upload success, located at https://huggingface.co/"+model_repo+"/tree/main\n")
  else: 
    print(f"Uploading {trained_model} to https://huggingface.co/"+model_repo)
    print(f"Please wait...")
    if not commit_message:
      if model_paths.endswith(".safetensors"):
        commit_message = f"feat: upload safetensors version of {trained_model} "
      else:
        commit_message = f"feat: upload {trained_model} checkpoint"
            
    api.upload_file(
        path_or_fileobj=model_paths,
        path_in_repo=trained_model,
        repo_id=model_repo,
        revision=revision,
        commit_message=commit_message,
        )
        
    print(f"Upload success, located at https://huggingface.co/"+model_repo+"/blob/main/"+trained_model+"\n")
      
def upload():
    if model_path.endswith((".ckpt", ".safetensors", ".pt")):
      upload_model(model_path, False, commit_message)
    else:
      upload_model(model_path, True, commit_message)

upload()

## 8.3. Upload with GIT (Alternative)

In [None]:
#@title ### 8.3.1. Clone Repository
%cd /content/
clone_model = True #@param {'type': 'boolean'}

!git lfs install --skip-smudge
!export GIT_LFS_SKIP_SMUDGE=1

if clone_model:
  !git clone https://huggingface.co/{model_repo} /content/{repo_name}

In [None]:
#@title ### 8.3.2. Commit using Git 
import os

os.chdir(root_dir)

#@markdown Choose which repo you want to commit
commit_model = True #@param {'type': 'boolean'}
#@markdown #### Other Information
commit_message = "" #@param {type :"string"}

if not commit_message:
  commit_message = f"feat: upload {repo_name}"

!git config --global user.email "example@mail.com"
!git config --global user.name "example"

def commit(repo_folder, commit_message):
  os.chdir(os.path.join(root_dir, repo_folder))
  !git lfs install
  !huggingface-cli lfs-enable-largefiles .
  !git add .
  !git commit -m "{commit_message}"
  !git push

commit(repo_name, commit_message)
