<a href="https://colab.research.google.com/github/bachaudhry/FastAI-22-23/blob/main/course_part_2/01_Introduction_to_Generative_Modeling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# A Hands on Intro to Generative Modeling Using HF Diffusers.

_Github's renderer tends to break with these output heavy notebooks. So, the versions saved here will have all outputs cleared._

_In case I decide to retain outputs, then visit the [NB Viewer](https://nbviewer.org/github/bachaudhry/FastAI-22-23/blob/main/course_part_2/01_Introduction_to_Generative_Modeling.ipynb) link for the notebook.

In [None]:
!pip install -Uq diffusers transformers fastcore

In [None]:
import logging
from pathlib import Path

import matplotlib.pyplot as plt
import torch
from diffusers import StableDiffusionPipeline
from fastcore.all import concat
from huggingface_hub import notebook_login
from PIL import Image

logging.disable(logging.WARNING)

torch.manual_seed(44)
if not (Path.home()/'.cache/huggingface' / 'token').exists(): notebook_login()
from google.colab import userdata
userdata.get('HF_TOKEN')

## Setting Up the Stable Diffusion Pipeline

In [None]:
pipe = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4",
                                               variant="fp16",
                                               torch_dtype=torch.float16).to("cuda")

In [None]:
# Checking location of the model weights
!ls ~/.cache/huggingface/hub

In [None]:
# In case the GPU has insufficient memory
# pipe.enable_attention_slicing()

In [None]:
# Testing first prompt
prompt = "A picture of polar bear in the style of national geographic"

In [None]:
pipe(prompt).images[0]

In [None]:
# Using different seed values
torch.manual_seed(8161)
pipe(prompt).images[0]

In [None]:
# Using different seed values
torch.manual_seed(42)
pipe(prompt).images[0]

As diffusion models generate images from random noise after a series of steps, we can play around with the number of steps to see the effects on the model's outputs.

In [None]:
# Taking the manual  seed setting from the last cell
torch.manual_seed(42)
pipe(prompt, num_inference_steps=3).images[0]

In [None]:
# Increase the number of steps to 10
torch.manual_seed(42)
pipe(prompt, num_inference_steps=10).images[0]

In [None]:
# Increase the number of steps to 16
torch.manual_seed(42)
pipe(prompt, num_inference_steps=16).images[0]

In [None]:
# Let's take it up to 40
torch.manual_seed(42)
pipe(prompt, num_inference_steps=40).images[0]

In [None]:
# Cranking to 100
torch.manual_seed(42)
pipe(prompt, num_inference_steps=100).images[0]

## Classifier Free Guidance

This method is used to increase adherence of the outputs to the conditioning signal used in the prompts.

Larger guidance settings increase adherence at the expense of diversity. The default setting is `7.5`

In [None]:
def image_grid(imgs, rows, cols):
  w, h = imgs[0].size
  grid = Image.new('RGB', size=(cols * w, rows * h))
  for i, img in enumerate(imgs):
    grid.paste(img, box=(i % cols * w, i // cols * h))
  return grid

In [None]:
# Testing guidance parameter settings
num_rows, num_cols = 4, 4
prompts = [prompt] * num_cols

In [None]:
images = concat(pipe(prompts, guidance_scale=g).images for g in [1.1, 4, 10, 20])

In [None]:
image_grid(images, rows=num_rows, cols=num_cols)

## Negative Prompts

In [None]:
torch.manual_seed(64)
prompt = "Early morning in the Himalayas"
pipe(prompt).images[0]

In [None]:
torch.manual_seed(64)
pipe(prompt, negative_prompt="red").images[0]

In [None]:
# Testing different guidance scales like we did in the previous section
torch.manual_seed(64)

num_rows, num_cols = 4, 4
prompts = [prompt] * num_cols
neg_prompt = ['blue'] *num_cols

imgs = concat(pipe(prompts, negative_prompt=neg_prompt, guidance_scale=g).images for g in [1.1, 4, 10, 20])

In [None]:
image_grid(imgs, rows=num_rows, cols=num_cols)

## Image to Image

In [None]:
!nvidia-smi

In [None]:
# Recovering GPU memory
torch.cuda.empty_cache()

In [None]:
from diffusers import StableDiffusionImg2ImgPipeline
from fastdownload import FastDownload

In [None]:
pipe = StableDiffusionImg2ImgPipeline.from_pretrained(
    "CompVis/stable-diffusion-v1-4",
    variant="fp16",
    torch_dtype=torch.float16,
).to("cuda")

In [None]:
# Using the lesson example
p = FastDownload().download('https://cdn-uploads.huggingface.co/production/uploads/1664665907257-noauth.png')
init_image = Image.open(p).convert("RGB")
init_image

In [None]:
torch.manual_seed(21)
prompt = "Owl on a moonlit night, photorealistic 4K"
images = pipe(prompt=prompt, num_images_per_prompt=3,
              image=init_image, strength=0.8, num_inference_steps=50).images
image_grid(images, rows=1, cols=3)

In [None]:
# Selecting a generated image to seed the series of prompts
init_image=images[1]

torch.manual_seed(21)
prompt = "An Owl in the style of Animal Planet."
images = pipe(prompt=prompt, num_images_per_prompt=3,
              image=init_image, strength=1, num_inference_steps=100).images
image_grid(images, rows=1, cols=3)

## Fine Tuning

This [blogpost](https:/https://lambdalabs.com/blog/how-to-fine-tune-stable-diffusion-how-we-made-the-text-to-pokemon-model-at-lambda//) shows how the folks at Lambda Labs applied fine tuning to get a text-to-pokemon model.

**LINK to NB**

## Textual Inversion

Using this technique, we can "teach" a new word to the text model and train its embeddings accordingly.

The token vocabulary is updated, while the model weights are frozen - apart from the text encoder - and the generator is trained using a sample of representative images.

In [None]:
# Using the embeddings from the above link
pipe = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4",
                                               variant="fp16",
                                               torch_dtype=torch.float16)
pipe = pipe.to("cuda")

For textual inversion, we will be using HuggingFace's **[sd-concepts-library/tim-sale](https://https://huggingface.co/sd-concepts-library/tim-sale)** style.

In [None]:
embeds_url = "https://huggingface.co/sd-concepts-library/tim-sale/blob/main/learned_embeds.bin"
embeds_path = FastDownload().download(embeds_url)
#embeds_dict = torch.load(str(embeds_path), map_location=torch.device("cpu"))  # Throws an unpickling error
embeds_path

In [None]:
import os
embeds_new_path = '/content/drive/MyDrive/Learned_Embs/learned_embeds(1).bin'
embeds_dict =  torch.load(str(embeds_new_path), map_location=torch.device("cpu"))

In [None]:
tokenizer = pipe.tokenizer
text_encoder = pipe.text_encoder
new_token, embeds = next(iter(embeds_dict.items()))
embeds = embeds.to(text_encoder.dtype)
new_token

In [None]:
# This new token will be added to the tokenizer and the embeddings to the embeddings table
assert tokenizer.add_tokens(new_token) == 1, "This token already exists!"

In [None]:
text_encoder.resize_token_embeddings(len(tokenizer))
new_token_id = tokenizer.convert_tokens_to_ids(new_token)

text_encoder.get_input_embeddings().weight.data[new_token_id] = embeds

In [None]:
# Running inference and referring to the newly added style
torch.manual_seed(1000)
prompt = "Supergirl smiling in the style of <cat-toy>"
images = pipe(prompt, num_images_per_prompt=4, num_inference_steps=100).images
image_grid(images, 1, 4)

In [None]:
torch.manual_seed(876)
prompt = "The Joker in the style of <cat-toy>"
images = pipe(prompt, num_images_per_prompt=4, guidance_scale=7.5, num_inference_steps=100).images
image_grid(images, 1, 4)

In [None]:
torch.manual_seed(76)
prompt = "Athlete running in the style of <cat-toy>"
images = pipe(prompt, num_images_per_prompt=4, num_inference_steps=100).images
image_grid(images, 1, 4)

In [None]:
torch.manual_seed(78)
prompt = "Close up of Batman in the style of <cat-toy>"
images = pipe(prompt, num_images_per_prompt=4, num_inference_steps=100).images
image_grid(images, 1, 4)

## Dreambooth

This technique of fine tuning is used to introduce new subjects by providing a few images as examples.

The difference from Textual Inversion is that we select an existing token in the vocab and fine-tune the model to bring the token close to the images that were provided.

In [None]:
# Recovering GPU memory
torch.cuda.empty_cache()

In [None]:
# We'll stick with JH's example from the lesson
# Using the rare sks token token to qualify the term person
pipe = StableDiffusionPipeline.from_pretrained("pcuenq/jh_dreambooth_1000",
                                               torch_dtype=torch.float16)
pipe = pipe.to("cuda")

In [None]:
torch.manual_seed(44)

prompt = "Painting of sks person in the style of Paul Signac"
images = pipe(prompt, num_images_per_prompt=4).images
image_grid(images, 1, 4)

## Latents and Callbacks

Standard Diffusion has a major downside i.e. the reverse denoising process is slow and the models are known to consume alot of memory since they operate in pixel space.

Instead of using the actual pixel space, we can apply the diffusion process over a lower resolution latent space. This is the key difference between standard and latent diffusion models - where Stable Diffusion belongs to the latter class of models.



In [None]:
# The stable diffusion pipeline can send intermediate latents to a callback function.
# Running these latents through an image decoder i.e. VAE component, we can observe the
# denoising process.
vae = pipe.vae
images = []

def latents_callback(i, t, latents):
  latents = 1 / 0.18215 * latents
  image = vae.decode(latents).sample[0]
  image = (image / 2 + 0.5).clamp(0, 1)
  image = image.cpu().permute(1, 2, 0).numpy()
  images.extend(pipe.numpy_to_pil(image))

prompt = "Oil on canvas portrait of Gandhi reading a book."
torch.manual_seed(1452)

final_image = pipe(prompt, callback=latents_callback, callback_steps=8).images[0]
images.append(final_image)
image_grid(images, rows=1, cols=len(images))

Taking a closer look at the pipeline. We'll first get rid of the existing pipe object.

In [None]:
del pipe

## Recreating the Pipeline From Scratch

In [None]:
# Recovering GPU memory
torch.cuda.empty_cache()

In [None]:
from transformers import CLIPTextModel, CLIPTokenizer

In [None]:
tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-large-patch14", torch_dtype=torch.float16)
text_encoder = CLIPTextModel.from_pretrained("openai/clip-vit-large-patch14", torch_dtype=torch.float16).to("cuda")

In [None]:
# Loading the VAE and the UNET
from diffusers import AutoencoderKL, UNet2DConditionModel
# The VAE below has been fine tuned for more steps
vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-ema", torch_dtype=torch.float16).to("cuda")
unet = UNet2DConditionModel.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="unet", torch_dtype=torch.float16).to("cuda")

The lesson notebook uses a different scheduler, called the K-LMS Scheduler. Also, we will need to use the same noising schedule that was used during training.

In [None]:
# The K-LMS scheduler evolves betas over 1000 steps as follows
beta_start, beta_end = 0.00085, 0.012
plt.plot(torch.linspace(beta_start**0.5, beta_end**0.5, 1000) ** 2)
plt.xlabel('Timestep')
plt.ylabel('β');

In [None]:
from diffusers import LMSDiscreteScheduler

scheduler = LMSDiscreteScheduler(beta_start=beta_start, beta_end=beta_end, beta_schedule="scaled_linear",
                                 num_train_timesteps=1000)

In [None]:
# Defining parameters to be used for generation

prompt = ["a photograph of a scuba diver riding a bicycle"]

height = 512
width = 512
num_inference_steps = 75
guidance_scale = 7.5
batch_size = 1

In [None]:
# Tokenizing the prompt
text_input = tokenizer(prompt, padding="max_length", max_length=tokenizer.model_max_length,
                       truncation=True, return_tensors="pt")
text_input['input_ids']

In [None]:
# Where this is the padding token
tokenizer.decode(49407)

In [None]:
# This is what the attention mask looks like
text_input['attention_mask']

In [None]:
# The text encoder provides embeddings for  our prompt
text_embeddings = text_encoder(text_input.input_ids.to("cuda"))[0].half()
text_embeddings.shape

In [None]:
# Getting the embeddings required to perform unconditional generation. The empty string
# is created to handle this and it allows the model to run wild with its generations.
max_length = text_input.input_ids.shape[-1]
uncond_input = tokenizer(
    [""] * batch_size, padding="max_length", max_length=max_length, return_tensors="pt"
)
uncond_embeddings = text_encoder(uncond_input.input_ids.to("cuda"))[0].half()
uncond_embeddings.shape

In [None]:
# Classifier free guidance requires two forward passes. One with conditioned input
# and the other with unconditional embeddings and both are concatenated into a single batch
# to avoid two forward passes.
text_embeddings = torch.cat([uncond_embeddings, text_embeddings])

# Denoising starts from pure Gaussian noise, which become our initial latents
torch.manual_seed(100)
latents = torch.randn((batch_size, unet.in_channels, height // 8, width // 8))
latents = latents.to("cuda").half()
latents.shape

In [None]:
# Initializing the scheduler
scheduler.set_timesteps(num_inference_steps)
# Scaling the initial noise by the standard deviation required by the scheduler
latents = latents * scheduler.init_noise_sigma

In [None]:
# This is what the outputs of the above look like
scheduler.timesteps

In [None]:
#...and
scheduler.sigmas

In [None]:
plt.plot(scheduler.timesteps, scheduler.sigmas[:-1]);

In [None]:
# Denoising Loop
from tqdm.auto import tqdm

for i, t in enumerate(tqdm(scheduler.timesteps)):
  input = torch.cat([latents] * 2)
  input = scheduler.scale_model_input(input, t)

  # predict the noise residual
  with torch.no_grad():
    pred = unet(input, t, encoder_hidden_states=text_embeddings).sample

  # Perform guidance
  pred_uncond, pred_text = pred.chunk(2)
  pred = pred_uncond + guidance_scale * (pred_text - pred_uncond)

  # compute the previous noisy sample
  latents = scheduler.step(pred, t, latents).prev_sample


In [None]:
# Our latents now contain the denoised representation of the image.
# The VAE decoder converts it back to pixel space.
with torch.no_grad():
  image = vae.decode(1 / 0.18215 * latents).sample

In [None]:
# Converting the image to PIL
image = (image / 2 + 0.5).clamp(0, 1)
image = image[0].detach().cpu().permute(1, 2, 0).numpy()
image = (image * 255).round().astype("uint8")
Image.fromarray(image)

## Putting The Above Into Functions

In [None]:
prompts = [
    "a photograph of a scuba diver riding a bicycle",
    "a pastel painting of a scuba diver riding a bicycle in the style of Degas "
]

In [None]:
def text_enc(prompts, max_len=None):
  if max_len is None: max_len = tokenizer.model_max_length
  inp = tokenizer(prompts, padding="max_length", max_length=max_len, truncation=True,
                  return_tensors="pt")
  return text_encoder(inp.input_ids.to("cuda"))[0].half()

def mk_img(t):
  image = (t / 2 + 0.5).clamp(0, 1).detach().cpu().permute(1, 2, 0).numpy()
  return Image.fromarray((image * 255).round().astype("uint8"))

In [None]:
def mk_samples(prompts, g=11, seed=100, steps=75):
  bs = len(prompts)
  text = text_enc(prompts)
  uncond = text_enc([""] * bs, text.shape[1])
  emb = torch.cat([uncond, text])
  if seed: torch.manual_seed(seed)

  latents = torch.randn((bs, unet.in_channels, height // 8, width // 8))
  scheduler.set_timesteps(steps)
  latents = latents.to("cuda").half() * scheduler.init_noise_sigma

  for i, ts in enumerate(tqdm(scheduler.timesteps)):
    inp = scheduler.scale_model_input(torch.cat([latents] * 2), ts)
    with torch.no_grad(): u, t = unet(inp, ts, encoder_hidden_states=emb).sample.chunk(2)
    pred = u + g * (t - u)
    latents = scheduler.step(pred, ts, latents).prev_sample

  with torch.no_grad(): return vae.decode(1 / 0.18215 * latents).sample

In [None]:
images = mk_samples(prompts)

In [None]:
from IPython.display import display

for img in images: display(mk_img(img))