<a href="https://colab.research.google.com/github/amansyayf/Dreambooth_LoRA/blob/main/inference.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%capture
!cd /content/
!git clone https://github.com/amansyayf/Dreambooth_LoRA
!pip install -r "Dreambooth_LoRA/requirements.txt"
!pip install -U --pre triton
!pip install torchinfo

!git clone https://github.com/brian6091/lora --branch v0.0.5 --single-branch
!python -m pip install /content/lora/

In [None]:
!nvidia-smi -L

# Tested with Tesla T4 and A100 GPUs
!pip install xformers==0.0.16rc425

In [None]:
#@title ## Mount Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
import torch
from diffusers import DiffusionPipeline, StableDiffusionPipeline, DPMSolverMultistepScheduler, AutoencoderKL
from PIL import Image
import os
import json
import random
import string
from lora_diffusion import monkeypatch_lora, tune_lora_scale

device = "cuda"

def image_grid(imgs, rows, cols):
    """
        Makes grid of given images.
    """
    assert len(imgs) == rows*cols
    w, h = imgs[0].size
    grid = Image.new('RGB', size=(cols*w, rows*h))
    grid_w, grid_h = grid.size
    for i, img in enumerate(imgs):
        grid.paste(img, box=(i%cols*w, i//cols*h))
    return grid

def get_pipeline(model_name_or_path,
                 vae_name_or_path=None,
                 text_encoder_name_or_path=None,
                 feature_extractor_name_or_path=None,
                 revision="fp16"):
    """
        Loades pipeline of untrained original model.
    """
    scheduler = DPMSolverMultistepScheduler(
        beta_start=0.00085,
        beta_end=0.012,
        beta_schedule="scaled_linear",
        num_train_timesteps=1000,
        trained_betas=None,
        prediction_type="epsilon",
        thresholding=False,
        algorithm_type="dpmsolver++",
        solver_type="midpoint",
        lower_order_final=True,
    )

    pipe = DiffusionPipeline.from_pretrained(
        model_name_or_path,
        safety_checker=None,
        revision=revision,
        scheduler=scheduler,
        vae=AutoencoderKL.from_pretrained(
            vae_name_or_path or model_name_or_path,
            subfolder=None if vae_name_or_path else "vae",
            revision=None if vae_name_or_path else revision,
            torch_dtype=torch.float16,
        ),
        feature_extractor=feature_extractor_name_or_path,
        torch_dtype=torch.float16
    ).to("cuda")

    #https://github.com/huggingface/diffusers/issues/1552
    #pipe.enable_attention_slicing()
    pipe.enable_xformers_memory_efficient_attention()
    return pipe

# Monkey patch LoRA pt files
# Returns pipeline
def get_lora_pipeline(model_dir, scale_unet=1.0, scale_text_encoder=1.0):
    """
        Makes grid of inference images.
    """

    pipe = get_pipeline(MODEL_NAME_OR_PATH)

    print('Monkey patching unet pt file')
    monkeypatch_lora(pipe.unet, torch.load(os.path.join(model_dir, "lora_unet.pt")))

    print('Monkey patching text encoder pt file')
    monkeypatch_lora(pipe.text_encoder, torch.load(os.path.join(model_dir, "lora_text_encoder.pt")), target_replace_module=["CLIPAttention"])

    tune_lora_scale(pipe.unet, scale_unet)
    tune_lora_scale(pipe.text_encoder, scale_text_encoder)

    return pipe

def get_config(filename=None,
               save_dir=None,
               prompt=None, negative_prompt=None,
               seeds=None,
               num_samples=4,
               width=512, height=512,
               inference_steps=20,
               guidance_scale=7.5,
               ):
    """
        Creates needed configuration for loading inference grid

    """

    if filename==None:
        num_prompts = len(prompt)
        if seeds==None:
            seeds = []

            for i in range(num_samples):
                seeds.append(i * 1000000)
        else:
            num_samples = len(seeds)

        tag = ''.join(random.choice(string.ascii_letters) for _ in range(8))
        config = {
            "tag": tag,
            "prompt": prompt,
            "negative_prompt": negative_prompt,
            "num_prompts": num_prompts,
            "num_samples": num_samples,
            "seeds": seeds,
            "height": height,
            "width": width,
            "inference_steps": inference_steps,
            "guidance_scale": guidance_scale,
        }

        with open(os.path.join(save_dir, "config_"+tag+".json"), "w") as outfile:
            json.dump(config, outfile)
    else:
        f = open(filename)
        config = json.load(f)

    return config

def get_images(pipe, sample_config, device="cuda"):
    """
        Creates inference images

    """
    generator = torch.Generator("cuda")
    with torch.autocast(device):
        num_cfg = len(sample_config['guidance_scale'])
        # Loop in order to use defined seed for each image in a batch
        all_images = []
        for i in range(sample_config['num_samples']):
            for cfg in sample_config['guidance_scale']:
                # Manually generate latent
                seed = sample_config['seeds'][i]
                generator = generator.manual_seed(seed)
                latent = torch.randn(
                    (1, pipe.unet.in_channels, sample_config['height'] // 8, sample_config['width'] // 8),
                    generator = generator,
                    device = device
                )
                images = pipe(sample_config['prompt'],
                    negative_prompt=sample_config['negative_prompt'] * len(sample_config['prompt']),
                    num_inference_steps=int(sample_config['inference_steps']),
                    guidance_scale=cfg,
                    latents=latent.repeat(sample_config['num_prompts'], 1, 1, 1),
                ).images
                all_images.extend(images)

    return all_images

def make_reversed_order(images, rows, cols):
    """
        Changes order of images for grid image

    """
    images = []
    for i in range(rows):
      for j in range(cols):
        images.append(all_images[rows*j+i])
    return images


In [None]:
MODEL_NAME_OR_PATH = "runwayml/stable-diffusion-v1-5"
OUTPUT_DIR =  "/content/gdrive/MyDrive/experiment" #Where trained LoRA models are located

In [None]:
#@title Specify which models to do inference with
model_list = [
              os.path.join(OUTPUT_DIR,'500'),
              os.path.join(OUTPUT_DIR,'1000'),
              os.path.join(OUTPUT_DIR,'1500'),
              os.path.join(OUTPUT_DIR,'2000'),
              os.path.join(OUTPUT_DIR,'2500'),
              ]

print(model_list)

In [None]:
#@title Generate or load a configuration for inference

config_name = None
#config_name = os.path.join(OUTPUT_DIR, "config_ZMasiqkP.json")

if config_name is None:

    prompt = ["sks penguin", "close-up sks penguin", "sks penguin in front of eiffel tower", "sks penguin riding a bicycle", "sks penguin wearing sunglasses and holding a phone"]
    negative_prompt = [""]
    guidance_scale = [7.5]
    seeds = [2000000]


    config = get_config(save_dir=OUTPUT_DIR,
                        prompt=prompt, negative_prompt=negative_prompt,
                        seeds = seeds,
                        width=512, height=512,
                        inference_steps=50, guidance_scale=guidance_scale
                        )
else:
    config = get_config(filename=config_name)


In [None]:
#@title Make inference

LORA_SCALE_UNET = 1.0
LORA_SCALE_TENC = 1.0

all_images = []

for model in model_list:
    pipe = get_lora_pipeline(model, scale_unet=LORA_SCALE_UNET, scale_text_encoder=LORA_SCALE_TENC)
    images = get_images(pipe, config)
    display(images)
    all_images.extend(images)

    del pipe
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

In [None]:
images = make_reversed_order(all_images, 5, 5)
grid = image_grid(images, rows=5, cols=5)
grid.save(os.path.join(OUTPUT_DIR, "grid.jpg"), quality=90, optimize=True)

In [None]:
#@title # Close Colab instance
from google.colab import runtime
runtime.unassign()