<a href="https://colab.research.google.com/github/cmz97/stable-diff-playground/blob/main/sd-colab-toolkit.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# I. Installation

In [None]:
#@title ## 1.1 Install Dependencies
import os
import zipfile
import shutil

root_dir = "/content/"
repo_dir = f"{root_dir}/sd-scripts"
models_dir = f"{root_dir}/models"
vaes_dir = f"{root_dir}/vae"
deps_dir = f"{root_dir}/deps"
tools_dir = f"{root_dir}/sd-scripts/tools"

os.makedirs(models_dir, exist_ok=True)
os.makedirs(vaes_dir, exist_ok=True)

if not os.path.isdir(repo_dir):
  !git clone https://github.com/kohya-ss/sd-scripts

os.chdir(repo_dir)

#@markdown This will install required Python packages
!pip install --upgrade -r requirements.txt
!pip install --upgrade --no-cache-dir gdown

def ubuntu_deps(url, name, dst):
  !wget -q --show-progress {url}
  with zipfile.ZipFile(name, 'r') as deps:
    deps.extractall(dst)
  !dpkg -i {dst}/*
  os.remove(name)
  shutil.rmtree(dst)
ubuntu_deps("https://huggingface.co/Linaqruf/fast-repo/resolve/main/deb-libs.zip", "deb-libs.zip", deps_dir)



In [None]:
#@title ## 1.2. Download Custom Model

os.chdir(models_dir)

import os

#@markdown ### Custom model
modelList = []
modelUrl = "" #@param {'type': 'string'}
modelUrl2 = "" #@param {'type': 'string'}
modelUrl3 = "" #@param {'type': 'string'}
modelUrl4 = "" #@param {'type': 'string'}

modelList.extend([modelUrl, 
                  modelUrl2,
                  modelUrl3,
                  modelUrl4])

def install_aria():
  if not os.path.exists('/usr/bin/aria2c'):
    !apt install -y -qq aria2

def install(url):


  if url.startswith("https://drive.google.com"):
    !gdown --fuzzy  "{url}"
  elif url.startswith("magnet:?"):
    install_aria()
    !aria2c --summary-interval=10 -c -x 10 -k 1M -s 10 "{url}"
  elif url.startswith("https://huggingface.co/"):
    base_name = os.path.basename(url)
    if '/blob/' in url:
      url = url.replace('/blob/', '/resolve/')

    hf_token = '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'
    user_header = f"\"Authorization: Bearer {hf_token}\""
    !aria2c --console-log-level=error --summary-interval=10 --header={user_header} -c -x 16 -k 1M -s 16 -d {models_dir} -o "{base_name}" "{url}"
  else:
    !aria2c --console-log-level=error --summary-interval=10 -c -x 16 -k 1M -s 16 -d {models_dir} -Z "{url}"

def install_checkpoint():
  for customModel in modelList:
    install(customModel)

install_checkpoint()



In [None]:
#@title ## 1.3. Download VAE
os.chdir(vaes_dir)

installVae = []
#@markdown ### Available VAE
#@markdown Select one of the VAEs to download, select `none` for not download VAE:
vaeUrl = ["", \
          "https://huggingface.co/Linaqruf/personal-backup/resolve/main/vae/animevae.pt", \
          "https://huggingface.co/hakurei/waifu-diffusion-v1-4/resolve/main/vae/kl-f8-anime.ckpt", \
          "https://huggingface.co/stabilityai/sd-vae-ft-mse-original/resolve/main/vae-ft-mse-840000-ema-pruned.ckpt"]
vaeList = ["none", \
           "anime.vae.pt", \
           "waifudiffusion.vae.pt", \
           "stablediffusion.vae.pt"]
vaeName = "anime.vae.pt" #@param ["none", "anime.vae.pt", "waifudiffusion.vae.pt", "stablediffusion.vae.pt"]

installVae.append((vaeName, vaeUrl[vaeList.index(vaeName)]))

def install(vae_name, url):
  hf_token = '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'
  user_header = f"\"Authorization: Bearer {hf_token}\""
  !aria2c --console-log-level=error --summary-interval=10 --header={user_header} -c -x 16 -k 1M -s 16 -d {vaes_dir} -o {vae_name} "{url}"

def install_vae():
  if vaeName != "none":
    for vae in installVae:
      install(vae[0], vae[1])
  else:
    pass

install_vae()


# 2. Model Conversion

In [None]:
import os
%store -r
#@title ## 7.2. Model Pruner

os.chdir(tools_dir)

if not os.path.exists('prune.py'):
  !wget https://raw.githubusercontent.com/lopho/stable-diffusion-prune/main/prune.py

#@markdown Convert to Float16
fp16 = False #@param {'type':'boolean'}
#@markdown Use EMA for weights
ema = False #@param {'type':'boolean'}
#@markdown Strip CLIP weights
no_clip = False #@param {'type':'boolean'}
#@markdown Strip VAE weights
no_vae = False #@param {'type':'boolean'}
#@markdown Strip depth model weights
no_depth = False #@param {'type':'boolean'}
#@markdown Strip UNet weights
no_unet = False #@param {'type':'boolean'}

input = "" #@param {'type' : 'string'}

print(f"Loading model from {input}")

input_path = os.path.dirname(input)
base_name = os.path.basename(input)
output_name = base_name.split('.')[0]

if fp16:
    print("Converting to float16")
    output_name += '-fp16'
if ema:
    print("Using EMA for weights")
    output_name += '-ema'
if no_clip:
    print("Stripping CLIP weights")
    output_name += '-no-clip'
if no_vae:
    print("Stripping VAE weights")
    output_name += '-no-vae'
if no_depth:
    print("Stripping depth model weights")
    output_name += '-no-depth'
if no_unet:
    print("Stripping UNet weights")
    output_name += '-no-unet'
output_name += '-pruned'
output_path = os.path.join(input_path, output_name + ('.ckpt' if input.endswith(".ckpt") else ".safetensors"))

!python3 prune.py "{input}" \
  "{output_path}" \
  {'--fp16' if fp16 else ''} \
  {'--ema' if ema else ''} \
  {'--no-clip' if no_clip else ''} \
  {'--no-vae' if no_vae else ''} \
  {'--no-depth' if no_depth else ''} \
  {'--no-unet' if no_unet else ''}

print(f"Saving pruned model to {output_path}")

In [None]:
#@title ## 7.1. Convert Diffusers to Checkpoint
import os
%store -r

os.chdir(tools_dir)

#@markdown ### Conversion Config
model_to_load = "" #@param {'type': 'string'}
model_to_save = os.path.splitext(model_to_load)[0]
convert = "checkpoint_to_diffusers" #@param ["diffusers_to_checkpoint", "checkpoint_to_diffusers"] {'allow-input': false}
v2 = False #@param {type:'boolean'}
global_step = 0 #@param {'type': 'number'}
epoch = 0 #@param {'type': 'number'}
use_safetensors = True #@param {'type': 'boolean'}
save_precision = "--float" #@param ["--fp16","--bf16","--float"] {'allow-input': false}

#@markdown Additional option for diffusers
feature_extractor = True #@param {'type': 'boolean'}
safety_checker = True #@param {'type': 'boolean'}

reference_model = "stabilityai/stable-diffusion-2-1" if v2 else "runwayml/stable-diffusion-v1-5" 

if convert == "diffusers_to_checkpoint":
  model_output = f"{model_to_save}.safetensors" if use_safetensors else f"{model_to_save}.ckpt"
  if not model_to_load.endswith(".ckpt") or model_to_load.endswith(".safetensors"):
    !python convert_diffusers20_original_sd.py \
        "{model_to_load}" \
        "{model_output}" \
        --global_step {global_step} \
        --epoch {epoch} \
        {save_precision}
else:    
    !python convert_diffusers20_original_sd.py \
        "{model_to_load}" \
        "{model_to_save}" \
        {"--fp16" if save_precision == "--fp16" else ""} \
        --global_step {global_step} \
        --epoch {epoch} \
        {"--use_safetensors" if use_safetensors else ""} \
        {"--v2" if v2 else "--v1"} \
        --reference_model {reference_model} 

    url1 = "https://huggingface.co/CompVis/stable-diffusion-safety-checker/resolve/main/preprocessor_config.json"
    url2 = "https://huggingface.co/CompVis/stable-diffusion-safety-checker/resolve/main/config.json"
    url3 = "https://huggingface.co/CompVis/stable-diffusion-safety-checker/resolve/main/pytorch_model.bin"

    if feature_extractor == True:
      if not os.path.exists(f'{model_to_save}/feature_extractor'):
        os.makedirs(f'{model_to_save}/feature_extractor')
      
      !aria2c --console-log-level=error --summary-interval=10 -c -x 16 -k 1M -s 16 -d '{model_to_save}/feature_extractor' -o 'preprocessor_config.json' {url1}

    if safety_checker == True:
      if not os.path.exists(f'{model_to_save}/safety_checker'):
        os.makedirs(f'{model_to_save}/safety_checker')
      
      !aria2c --console-log-level=error --summary-interval=10 -c -x 16 -k 1M -s 16 -d '{model_to_save}/safety_checker' -o 'config.json' {url2}
      !aria2c --console-log-level=error --summary-interval=10 -c -x 16 -k 1M -s 16 -d '{model_to_save}/safety_checker' -o 'pytorch_model.bin' {url3}


In [None]:
#@title 2.3. Replace VAE of Existing Model 

os.chdir(tools_dir)
if not os.path.exists('merge_vae.py'):
  !wget https://raw.githubusercontent.com/Linaqruf/kohya-trainer/main/tools/merge_vae.py

#@markdown You need to input model ends with `.ckpt`, because `.safetensors` model won't work.

target_model = "" #@param {'type': 'string'}
target_vae = "/content/vae/anime.vae.pt" #@param {'type': 'string'}
use_safetensors = False #@param {type:'boolean'}
# get the base file name and directory
base_name = os.path.basename(target_model)
base_dir = os.path.dirname(target_model)

# get the file name without extension
file_name = os.path.splitext(base_name)[0]

# create the new file name
new_file_name = file_name + "-vae-swapped"

# get the file extension
file_ext = os.path.splitext(base_name)[1]

# create the output file path
output_model = os.path.join(base_dir, new_file_name + file_ext)

!python merge_vae.py \
  {target_model} \
  {target_vae} \
  {output_model}



In [None]:
#@title 2.4. Convert CKPT-2-Safetensors

import os
import torch
from safetensors.torch import load_file, save_file
from torch import load, save

model_path = "" #@param {type: 'string'}

def is_safetensors(path):
  return os.path.splitext(path)[1].lower() == '.safetensors'

def convert(model_path):
  print("Loading model:", os.path.basename(model_path))
  
  try:
      with torch.no_grad():
          print("Conversion in progress, please wait...")
          if is_safetensors(model_path):
            model = load_file(model_path, device="cpu")
          else:
            model = load(model_path, map_location="cpu")
          
          if 'state_dict' in model:
            sd = model['state_dict']
          else:
            sd = model

          save_to = ".ckpt" if is_safetensors(model_path) else ".safetensors"
          output = os.path.splitext(model_path)[0] + save_to

          if is_safetensors(model_path):
            save(sd, output)
          else:
            save_file(sd, output)

      print(f'Successfully converted {os.path.basename(model_path)} to {os.path.basename(output)}')
      print(f'located in this path : {output}')
  except Exception as ex:
      print(f'ERROR converting {os.path.basename(model_path)}: {ex}')

  print('Done!')

def main():
  convert(model_path)
main()


# VIII. Deployment

In [None]:
#@title ## 8.1. Upload Config
from huggingface_hub import login
from huggingface_hub import HfApi
from huggingface_hub.utils import validate_repo_id, HfHubHTTPError

#@markdown Login to Huggingface Hub 
#@markdown > Get **your** huggingface `WRITE` token [here](https://huggingface.co/settings/tokens)
write_token = "" #@param {type:"string"}
login(write_token, add_to_git_credential=True)

api = HfApi()
user = api.whoami(write_token)

#@markdown Fill this if you want to upload to your organization, or just leave it empty.

orgs_name = "" #@param{type:"string"}

#@markdown If your model/dataset repo didn't exist, it will automatically create your repo.
model_name = "" #@param{type:"string"}
make_this_model_private = False #@param{type:"boolean"}

if orgs_name == "":
  model_repo = user['name']+"/"+model_name.strip()
else:
  model_repo = orgs_name+"/"+model_name.strip()

if model_name != "":
  try:
      validate_repo_id(model_repo)
      api.create_repo(repo_id=model_repo, 
                      private=make_this_model_private)
      print("Model Repo didn't exists, creating repo")
      print("Model Repo: ",model_repo,"created!\n")

  except HfHubHTTPError as e:
      print(f"Model Repo: {model_repo} exists, skipping create repo\n")

## 8.2. Upload with Huggingface Hub

In [None]:
#@title ### 8.2.1. Upload Model
from huggingface_hub import HfApi
from huggingface_hub.utils import validate_repo_id, HfHubHTTPError
from pathlib import Path

api = HfApi()

#@markdown This will be uploaded to model repo

model_path = "/content/models/anything-v3-1.ckpt" #@param {type :"string"}
path_in_repo = "" #@param {type :"string"}
revision = "" #@param {type :"string"}
if revision:
  api.create_branch(repo_id=model_repo, 
                branch=revision, 
                exist_ok=True)
else:
  revision = "main"
#@markdown Other Information
commit_message = "" #@param {type :"string"}


if os.path.exists(model_path):
  vae_exists = os.path.exists(os.path.join(model_path, 'vae'))
  unet_exists = os.path.exists(os.path.join(model_path, 'unet'))
  text_encoder_exists = os.path.exists(os.path.join(model_path, 'text_encoder'))
    
def upload_model(model_paths, is_folder :bool, commit_message):
  path_obj = Path(model_paths)
  trained_model = path_obj.parts[-1]
  
  if path_in_repo:
    trained_model = path_in_repo
    
  if is_folder == True:
    print(f"Uploading {trained_model} to https://huggingface.co/"+model_repo)
    print(f"Please wait...")

    if vae_exists and unet_exists and text_encoder_exists:
      if not commit_message:
        commit_message = f"feat: upload diffusers version of {trained_model}"

      api.upload_folder(
          folder_path=model_paths,
          repo_id=model_repo,
          revision=revision,
          commit_message=commit_message,
          ignore_patterns=".ipynb_checkpoints"
          )
    
    else:
      if not commit_message:
        commit_message = f"feat: upload {trained_model} checkpoint folder"

      api.upload_folder(
          folder_path=model_paths,
          path_in_repo=trained_model,
          repo_id=model_repo,
          revision=revision,
          commit_message=commit_message,
          ignore_patterns=".ipynb_checkpoints"
          )
    print(f"Upload success, located at https://huggingface.co/"+model_repo+"/tree/main\n")
  else: 
    print(f"Uploading {trained_model} to https://huggingface.co/"+model_repo)
    print(f"Please wait...")
    if not commit_message:
      if model_paths.endswith(".safetensors"):
        commit_message = f"feat: upload safetensors version of {trained_model} "
      else:
        commit_message = f"feat: upload {trained_model} checkpoint"
            
    api.upload_file(
        path_or_fileobj=model_paths,
        path_in_repo=trained_model,
        repo_id=model_repo,
        revision=revision,
        commit_message=commit_message,
        )
        
    print(f"Upload success, located at https://huggingface.co/"+model_repo+"/blob/main/"+trained_model+"\n")
      
def upload():
    if model_path.endswith((".ckpt", ".safetensors", ".pt")):
      upload_model(model_path, False, commit_message)
    else:
      upload_model(model_path, True, commit_message)

upload()

## 8.3. Upload with GIT (Alternative)

In [None]:
#@title ### 8.3.1. Clone Repository
%cd /content/
clone_model = True #@param {'type': 'boolean'}

!git lfs install --skip-smudge
!export GIT_LFS_SKIP_SMUDGE=1

if clone_model:
  !git clone https://huggingface.co/{model_repo} /content/{model_name}

In [None]:
#@title ### 8.3.2. Commit using Git 
import os

os.chdir(root_dir)

#@markdown Choose which repo you want to commit
commit_model = True #@param {'type': 'boolean'}
commit_dataset = True #@param {'type': 'boolean'}
#@markdown #### Other Information
commit_message = "" #@param {type :"string"}

if not commit_message:
  commit_message = f"feat: upload {model_name}"

!git config --global user.email "example@mail.com"
!git config --global user.name "example"

def commit(repo_folder, commit_message):
  os.chdir(os.path.join(root_dir, repo_folder))
  !git lfs install
  !huggingface-cli lfs-enable-largefiles .
  !git add .
  !git commit -m "{commit_message}"
  !git push

commit(model_name, commit_message)
