In [None]:
import subprocess
from google.colab import output
#@markdown Check type of GPU and VRAM available. Make sure we have atleast a Telsa T4 GPU.
# Run the nvidia-smi command to get the VRAM information
result = subprocess.run(["nvidia-smi", "--query-gpu=name,memory.total,memory.free", "--format=csv,noheader"], capture_output=True, check=True)

# Split the output by newline characters to get a list of VRAM info for each GPU
vram_info = result.stdout.decode("utf-8").strip().split("\n")

# Parse the VRAM info for each GPU
for info in vram_info:
    name, total, free = info.split(",")
    total = int(total.strip().split()[0])  # Total VRAM in MB
    free = int(free.strip().split()[0])  # Free VRAM in MB
    
    print(f"GPU: {name}, Total VRAM: {total} MB, Free VRAM: {free} MB")

if total < 15109:  # 15109MB is equivalent to 15GB
    # Display an error message in red text
    print("\033[91mError: Not enough VRAM available. Please change the runtime to a GPU with at least 15GB VRAM.\033[0m")
else:
    print("\033[92mYou have enough VRAM to continue\033[0m")

Tesla T4, 15109 MiB, 15109 MiB


# Install

In [None]:
#@markdown Install dependencies. You don't need to change any of these settings. Make sure to run this before running any cells below.
!wget -q -O train_dreambooth.py https://github.com/geocine/smd-diffusion/raw/main/train_dreambooth.py
!wget -q -O convert_diffusers_to_original_stable_diffusion.py https://github.com/geocine/smd-diffusion/raw/main/convert_diffusers_to_original_stable_diffusion.py
!wget -q -O concepts_list.json https://github.com/geocine/smd-diffusion/raw/main/concepts_list.json
# URLs of the diffusers and xformers packages
import subprocess
from ipywidgets import IntProgress, HTML, HBox
import time
from datetime import timedelta

class ProgressBar:
    def __init__(self, num_items, label_text='Progress'):
        self.num_items = num_items
        self.start_time = time.perf_counter()
        self.count = 0
        self.label_text = label_text

        # Create a progress bar and HTML widgets to display the labels and progress bar
        self.f = IntProgress(min=0, max=num_items)
        self.label1 = HTML(value=f'{label_text}: 0%')
        self.label2 = HTML(value='', layout=dict(margin='2px 0 0 10px'))

        # Group the widgets horizontally using the HBox layout
        display(HBox([self.label1, self.f, self.label2]))

    def update(self, label=''):
        value = 1
        self.count += value
        self.f.value += value
        percentage = f'{self.f.value / self.num_items * 100:.0f}'
        self.label1.value = f'{self.label_text}: {percentage}%'
        self.label2.value = label
        # change bar color to green if done
        if self.f.value == self.num_items:
            self.f.bar_style = 'success'

    def error(self, label=''):
        self.label2.value = 'Stopped due to error'
        self.f.bar_style = 'danger'

DIFFUSERS_URL = 'git+https://github.com/ShivamShrirao/diffusers'
XFORMERS_URL = 'https://github.com/geocine/dreamstall-binaries/releases/download/cxx-p38-txx-linux/xformers-0.0.15.dev0+4c06c79.d20221205-cp38-cp38-linux_x86_64.whl'
FORCE_REINSTALL = False #@param {type:"boolean"}

def install_package(package, force_reinstall=False):
    # Check if the package is already installed using pip freeze
    installed_packages = subprocess.run(["pip", "freeze"], capture_output=True).stdout.decode().split("\n")
    if not force_reinstall and any(package in s for s in installed_packages):
        return f'{package} is already installed'

    if package == 'diffusers':
        # Install the package using the URL
        result = subprocess.run(["pip", "-qq", "install", DIFFUSERS_URL], capture_output=True, text=True, check=True)
    elif package == 'xformers':
        # Install the package using the URL
        result = subprocess.run(["pip", "install", XFORMERS_URL], capture_output=True, text=True, check=True)
    else:
        # Install the package using pip
        result = subprocess.run(["pip", "install", package], capture_output=True, text=True, check=True)

    # Print the output of the command
    # print(result.stdout)
    return f'{package} is installed'
# List of packages to check and install
packages = ['diffusers', 'triton', 'accelerate==0.12.0', 'transformers', 'ftfy', 'bitsandbytes', 'xformers']

# Check and install each package
pb = ProgressBar(len(packages), "Installing")
for package in packages:
    label = install_package(package, FORCE_REINSTALL)
    pb.update(label)
print("\033[92mInstallation complete\033[0m")

## Settings


<details>
  <summary>Details about <code>{SDD_TOKEN}</code> and <code>{SDD_CLASS}</code></summary>

  - `SDD_TOKEN` - corresponds to the unique identifier which will reference the subject we want to add. This name should be unique, so we donâ€™t have to compete with an existing representation
  - `SDD_CLASS` - use generic classes such as man, woman, or child (if the subject is a person) or cat or dog (if the subject is a pet)

  > You could explore other classes. In this Colab I use the class `supermodel` by default since I get good results in training my personal subjects
</details>

In [None]:
import os
import json
from huggingface_hub import hf_hub_download
import subprocess
import warnings

# Disable the warning message
warnings.filterwarnings("ignore", category=UserWarning, module="IPython.core.interactiveshell")

SDD_TOKEN = "zwx" #@param {type:"string"}
SDD_CLASS = "supermodel" #@param {type:"string"}

# check if /content/concepts_list.json exists if not remind to run install
if not os.path.exists("/content/concepts_list.json"):
    # Display an error message in red text
    print("\033[93mPlease run the Install cell first\033[0m")
    
    # Raise a SystemExit exception to exit the cell
    raise SystemExit

# Open the JSON file and read the contents
with open("/content/concepts_list.json", "r") as f:
  json_data = json.load(f)

# Iterate over the object and replace the placeholders with the values
for item in json_data:
  for key, value in item.items():
    item[key] = value.format(SDD_TOKEN=SDD_TOKEN, SDD_CLASS=SDD_CLASS)

# Open the JSON file and write the updated contents
with open("/content/concepts_list.json", "w") as f:
  json.dump(json_data, f, indent=2)

MODEL_NAME = "runwayml/stable-diffusion-v1-5" #@param {type:"string"}

!mkdir -p ~/.huggingface
HUGGINGFACE_TOKEN = "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" #@param {type:"string"}
#@markdown You have to be a registered user in ðŸ¤— [Hugging Face](https://huggingface.co/), and you'll also need to use an [access token](https://huggingface.co/settings/tokens) for the code to work.

# check if HUGGINGFACE_TOKEN is set
if not HUGGINGFACE_TOKEN:
    # Display an error message in red text
    print("\033[93mPlease set HUGGINGFACE_TOKEN first.\033[0m")
    
    # Raise a SystemExit exception to exit the cell
    raise SystemExit

!echo -n "{HUGGINGFACE_TOKEN}" > ~/.huggingface/token

OUTPUT_DIR = f"stable_diffusion_models/{SDD_TOKEN}"
OUTPUT_DIR = "/content/" + OUTPUT_DIR

if os.path.exists(OUTPUT_DIR):
  # Change to the directory
  os.chdir(OUTPUT_DIR)
  # Remove all files and directories inside the directory using the rm command
  subprocess.run(["rm", "-rf", "*"], check=True)
else:
  # Create the directory
  os.makedirs(OUTPUT_DIR)

print(f"[*] Models will be saved at {OUTPUT_DIR}")

unzip_directory = f"/content/data/{SDD_CLASS}"

# Check if the unzip directory exists
try:
  # Get a list of the files in the unzip directory
  files = os.listdir(unzip_directory)
except FileNotFoundError:
  # Create the unzip directory
  os.makedirs(unzip_directory)
  # Set the files list to an empty list
  files = []

#Downloading the regularization images
zip_file = f"{SDD_CLASS}.zip"
if not os.path.exists(zip_file):
    try:
        reg_url = f"https://huggingface.co/datasets/geocine/sd-v1-5-regularization-images/resolve/main/{zip_file}"
        subprocess.run(["wget", "-q", reg_url], check=True)
    except Exception as e:
        # Print an error message and set the zip_file variable to None if the download fails or the user doesn't have access
        print(f"An error occurred while downloading the dataset: {e}")
        zip_file = None

# Check if the unzip directory has files
if len(files) > 0:
  # Do not run the unzip command
  print("Unzip directory has files. Skipping unzip.")
elif zip_file is None:
  # Do not run the unzip command
  print("Skipping unzip because the zip file was not downloaded")
else:
  command = f"unzip -l {zip_file} | wc -l" 
  result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  file_count = int(result.stdout.decode('utf-8').strip())
  # Run the unzip command
  pb = ProgressBar(file_count, "Extracting")
  process = subprocess.Popen(
    ["unzip", "-j", zip_file, "-d", unzip_directory], stdout=subprocess.PIPE, stderr=subprocess.PIPE
  )
  while process.poll() is None:
    out = process.stdout.readline()
    if out != '' :# and (b"extracting" in out):
      current_file = out.decode("utf-8").replace("extracting: ","")
      current_file = current_file.replace("inflating: ", "")
      pb.update(current_file)
  print("\033[92mExtracting regularization images completed\033[0m")

# Train

In [None]:
#@markdown Upload your photos by running this cell. Make sure your photos are 512x512. You can batch resize your photos using [this tool](https://www.birme.net/?target_width=512&target_height=512)

import os
import json
from google.colab import files
import shutil

# check if /content/concepts_list.json exists if not remind to run install
if not os.path.exists("/content/concepts_list.json"):
    raise ValueError("Please run the Install cell first")

# Load the data from the JSON file into the concepts_list variable
with open("/content/concepts_list.json", "r") as f:
    concepts_list = json.load(f)


# Incorporate this so that users won't have to crop their images https://github.com/d8ahazard/sd_smartprocess
for c in concepts_list:
   prompt = c['instance_prompt']
   prompt = prompt.format(SDD_TOKEN=SDD_TOKEN, SDD_CLASS=SDD_CLASS)
   print(f"Uploading instance images for `{prompt}`")
   uploaded = files.upload()
   for filename in uploaded.keys():
       dst_path = os.path.join(c['instance_data_dir'], filename)
       # Create the instance_data_dir directory if it does not exist
       os.makedirs(c['instance_data_dir'], exist_ok=True)
       shutil.move(filename, dst_path)

In [None]:
import os
import json

# Load the data from the JSON file into the concepts_list variable
with open("/content/concepts_list.json", "r") as f:
    concepts_list = json.load(f)

num_images = 0

for c in concepts_list:
    data_dir = c['instance_data_dir']
    # replace the SDD_TOKEN placeholders with the actual values
    data_dir = data_dir.format(SDD_TOKEN=SDD_TOKEN, SDD_CLASS=SDD_CLASS)
    # Check if the directory exists
    if os.path.exists(data_dir):
        # Check if the directory is empty
        num_files = len(os.listdir(data_dir))
        if num_files == 0:
            raise ValueError(f"The directory `{data_dir}` is empty. Please upload some images using the cell above.")
        else:
            num_images += num_files
    else:
        # Raise an exception if the directory does not exist
        raise ValueError(f"The directory `{data_dir}` does not exist. Please run the Upload cell first.")

# count the number of images on the instance_data_dir

# compute NUM_CLASS_IMAGES based on the number of images , num_images * 200 with a limit of 4000
# min(num_images * 200, 4000)
NUM_CLASS_IMAGES = 3000 #@param {type:"number"}
#@markdown `{TOKEN_CLASS}` will be replaced with the token and class name.
SAVE_SAMPLE_PROMPT = "photo of {TOKEN_CLASS}" #@param {type:"string"}
SAVE_SAMPLE_PROMPT = SAVE_SAMPLE_PROMPT.format(TOKEN_CLASS=f"{SDD_TOKEN} {SDD_CLASS}")
MAX_TRAIN_STEPS = 3000 #@param {type:"number"}
SAVE_INTERVAL = 400 #@param {type:"number"}
SAVE_MIN_STEPS = 2000 #@param {type:"number"}
CLEAR_MODELS = True #@param {type:"boolean"}
SAMPLE_BATCH_SIZE = 4


# Check SAVE_MIN_STEPS should be should be less than or equal MAX_TRAIN_STEPS
if SAVE_MIN_STEPS > MAX_TRAIN_STEPS:
    raise ValueError("Your model will not be saved if SAVE_MIN_STEPS is greater than MAX_TRAIN_STEPS.")

#@markdown If you have experience with training models, you can change more parameters on the code in this cell.

PREV_MODEL_STEPS = None
g_cuda = None

if CLEAR_MODELS:
    # Run the rm command using subprocess
    subprocess.run(["rm", "-rf", f"/content/stable_diffusion_models/{SDD_TOKEN}/*"])

!accelerate launch train_dreambooth.py \
  --pretrained_model_name_or_path=$MODEL_NAME \
  --pretrained_vae_name_or_path="stabilityai/sd-vae-ft-mse" \
  --output_dir=$OUTPUT_DIR \
  --revision="main" \
  --with_prior_preservation --prior_loss_weight=1.0 \
  --seed=1337 \
  --resolution=512 \
  --train_batch_size=1 \
  --train_text_encoder \
  --mixed_precision="fp16" \
  --use_8bit_adam \
  --gradient_accumulation_steps=1 \
  --learning_rate=1e-6 \
  --lr_scheduler="constant" \
  --lr_warmup_steps=0 \
  --num_class_images=$NUM_CLASS_IMAGES \
  --sample_batch_size=$SAMPLE_BATCH_SIZE \
  --max_train_steps=$MAX_TRAIN_STEPS \
  --save_interval=$SAVE_INTERVAL \
  --save_min_steps=$SAVE_MIN_STEPS \
  --save_sample_prompt="$SAVE_SAMPLE_PROMPT" \
  --concepts_list="concepts_list.json"
# --shuffle_after_epoch

In [None]:
#@markdown Run to generate a grid of preview images from the last saved models.
import os
import matplotlib.pyplot as plt
import matplotlib.image as mpimg


if "OUTPUT_DIR" not in globals():
    raise ValueError("Please run the Settings cell above.")

models_folder = OUTPUT_DIR
folders = sorted([f for f in os.listdir(models_folder) if f != "0"], key=lambda x: int(x))


# Get the list of folders in the models folder and check if the samples folder exists and have images, the number of images should be equal to the SAMPLE_BATCH_SIZE
for folder in folders:
    folder_path = os.path.join(models_folder, folder)
    image_folder = os.path.join(folder_path, "samples")
    if not os.path.exists(image_folder):
        raise ValueError(f"The folder `{image_folder}` does not exist. Please make sure you have run the Training cell above.")
    images = [f for f in os.listdir(image_folder)]
    if len(images) != SAMPLE_BATCH_SIZE:
        raise ValueError(f"The folder `{image_folder}` does not have {SAMPLE_BATCH_SIZE} images. Please make sure you have run the Training cell above.")

# Check if the number of folders > 0
if len(folders) == 0:
    raise ValueError("No folders found in the models folder. Please make sure you have run the training cell above.")

row = len(folders)
col = len(os.listdir(os.path.join(models_folder, folders[0], "samples")))
scale = 4
fig, axes = plt.subplots(row, col, figsize=(col*scale, row*scale), gridspec_kw={'hspace': 0, 'wspace': 0})

for i, folder in enumerate(folders):
    folder_path = os.path.join(models_folder, folder)
    image_folder = os.path.join(folder_path, "samples")
    images = [f for f in os.listdir(image_folder)]
    for j, image in enumerate(images):
        if row == 1:
            currAxes = axes[j]
        else:
            currAxes = axes[i, j]
        if i == 0:
            currAxes.set_title(f"Image {j}")
        if j == 0:
            currAxes.text(-0.1, 0.5, folder, rotation=0, va='center', ha='center', transform=currAxes.transAxes)
        image_path = os.path.join(image_folder, image)
        img = mpimg.imread(image_path)
        currAxes.imshow(img, cmap='gray')
        currAxes.axis('off')
        
plt.tight_layout()
plt.savefig('grid.png', dpi=72)

# Generate

In [None]:
#@markdown Make sure you have run the Training cell above before running this cell.
import os
import random 
import torch
from torch import autocast
from diffusers import StableDiffusionPipeline, DDIMScheduler, EulerAncestralDiscreteScheduler
from IPython.display import display

MODEL_STEPS = 2000 #@param {type:"number"}



if "SDD_TOKEN" not in globals():
    raise ValueError("Please run the Settings cell above.")

if "PREV_MODEL_STEPS" not in globals():
    raise ValueError("Please run the training cell above.")

if not os.path.exists(f'/content/stable_diffusion_models/{SDD_TOKEN}/{MODEL_STEPS}'):
    raise ValueError(f"Model with {MODEL_STEPS} steps does not exist. Please make sure you have run the Training cell above.")

if MODEL_STEPS != PREV_MODEL_STEPS or g_cuda is None:
  PREV_MODEL_STEPS = MODEL_STEPS
  model_path = f'/content/stable_diffusion_models/{SDD_TOKEN}/{MODEL_STEPS}'             # If you want to use previously trained model saved in gdrive, replace this with the full path of model in gdrive
  model_path = model_path.replace("{TOKEN}", SDD_TOKEN)
  scheduler = EulerAncestralDiscreteScheduler(num_train_timesteps=1000, beta_start=0.00085, beta_end=0.012, beta_schedule="scaled_linear")
  pipe = StableDiffusionPipeline.from_pretrained(model_path, scheduler=scheduler, safety_checker=None, torch_dtype=torch.float16).to("cuda")
  pipe.enable_xformers_memory_efficient_attention()
  g_cuda = torch.Generator(device='cuda')

# Make sure model_path exists
if not os.path.exists(model_path):
  raise ValueError(f"Model with `{MODEL_STEPS}` steps does not exist. Please make sure you have run the training cell above and this model step exist.")

SEED = -1  #@param {type:"number"}
if (SEED < 0):
  SEED = random.randint(0, 2**32 - 1) 
g_cuda.manual_seed(SEED)

#@markdown Enter a prompt to generate images from. `{TOKEN_CLASS}` will be replaced with the token and class name.
PROMPT = "photo of {TOKEN_CLASS}" #@param {type:"string"}
PROMPT = PROMPT.format(TOKEN_CLASS=f"{SDD_TOKEN} {SDD_CLASS}")
NEGATIVE_PROMPT = "" #@param {type:"string"}
NUM_SAMPLES = 2 #@param {type:"number"}
CFG = 8 #@param {type:"number"}
STEPS = 80 #@param {type:"number"}
height = 512 #@param {type:"number"}
width = 512 #@param {type:"number"}

with autocast("cuda"), torch.inference_mode():
    images = pipe(
        prompt=PROMPT,
        height=height,
        width=width,
        negative_prompt=NEGATIVE_PROMPT,
        num_images_per_prompt=NUM_SAMPLES,
        num_inference_steps=STEPS,
        guidance_scale=CFG,
        generator=g_cuda
    ).images

for img in images:
    display(img)

# Save

In [None]:
import os
from google.colab import drive

if "SDD_TOKEN" not in globals():
    raise ValueError("Please run the Settings cell above.")

#@markdown This will save chosen checkpoint on Google Drive. You can then download it from there.
MODEL_STEPS = 2800 #@param {type:"number"}
mdl_path = f"/content/stable_diffusion_models/{SDD_TOKEN}/{MODEL_STEPS}"
mdl_path = mdl_path.replace("{TOKEN}", SDD_TOKEN)
ckpt_path =  mdl_path + "/model.ckpt"

# Make sure model_path exists
if not os.path.exists(mdl_path):
  raise ValueError(f"Model with `{MODEL_STEPS}` steps does not exist. Please make sure you have run the training cell above and this model step exist.")


!python convert_diffusers_to_original_stable_diffusion.py --model_path $mdl_path  --checkpoint_path $ckpt_path --half
print(f"[*] Converted ckpt saved at {ckpt_path}")

# Check if Google Drive is already mounted
if not os.path.exists("/content/drive"):
    # Mount Google Drive
    drive.mount("/content/drive")

NAME = "aivan" #@param {type:"string"}
#@markdown Enter the path to save the model in Google Drive. If left empty, the model will be saved in the root of Google Drive.
GDRIVE_PATH = "Files" #@param {type:"string"}

# remove / from start and end of GDRIVE_PATH if they exist
GDRIVE_PATH = GDRIVE_PATH.strip('/')
MODEL_NAME = f"{SDD_CLASS}-{MODEL_STEPS}-{SDD_TOKEN}-{NAME}"
if GDRIVE_PATH:
    cmd = f"cp /content/stable_diffusion_models/{SDD_TOKEN}/{MODEL_STEPS}/model.ckpt /content/drive/MyDrive/{GDRIVE_PATH}/{MODEL_NAME}.ckpt"
else:
    cmd = f"cp /content/stable_diffusion_models/{SDD_TOKEN}/{MODEL_STEPS}/model.ckpt /content/drive/MyDrive/{MODEL_NAME}.ckpt"

# Execute the command
!{cmd}
print(f"Model saved at /{GDRIVE_PATH}/{MODEL_NAME}. Wait for 5 minutes before closing")
print(f"To use your model on other applications make sure to mention \"{SDD_TOKEN} {SDD_CLASS}\" in the prompt.")

In [None]:
#@title Free runtime memory
exit()