## Practical session 2: Stable Diffusion
In the previous part of the practical session, we trained a conditional model on a 2D dataset.  
In this session, we will use the `diffusers` library to generate images with stable diffusion a diffusion model which is able to generate images from a text conditionning.  
A simple way to experiment with stable diffusion is to use the `diffusers` library which provides a simple interface to work with diffusion models.  
Let's install it and import the necessary libraries.

In [None]:
%pip install -Uq diffusers

In [None]:
import torch
from PIL import Image
from matplotlib import pyplot as plt

device = "cuda" if torch.cuda.is_available() else "cpu"
from diffusers import StableDiffusionPipeline

We will use the `stabilityai/stable-diffusion-2-1-base` model which is a pretrained model on the `laion-aesthetic` dataset.  
Such models can be directly downloaded from the Hugging Face hub.  
One possible issue with these models is the fact that they can be quite large (around 1.5GB) and require a lot of GPU memory.  
We will use the `fp16` version of the model to save some GPU memory.

In [None]:
model_id = "stabilityai/stable-diffusion-2-1-base"
pipe = StableDiffusionPipeline.from_pretrained(model_id, revision="fp16", torch_dtype=torch.float16).to(device) # we load a fp16 version of the model to save some GPU memory
pipe.enable_attention_slicing() # attention slicing is another trick allowing video memory reduction

Diffusion models are stochastic models, meaning that they generate images by sampling from a probability distribution.  
To generate a deterministic image, we will use a torch generator to fix the random seed and thus get a deterministic output.  
The `diffusers` library provides a simple way to generate images with a given seed through a `Pipeline` object.  
Here is an example of how to generate an image with a given seed.

In [None]:
generator = torch.Generator(device=device).manual_seed(42)
prompt = "a photo of duck with sunglasses riding doing surf at a tropical beach"

pipe_output = pipe(
    prompt=prompt,
    negative_prompt="Oversaturated, blurry, low quality", # negative prompt to avoid unwanted artifacts, it is used during the classifier-free guidance
    height=512, width=512,
    guidance_scale=0, # classifier-free guidance scale, 0 means no classifier-free guidance
    num_inference_steps=35, # number of diffusion steps
    generator=generator,
)

pipe_output.images[0]

Not super impressive right? You may have obtained something like a bit abstract, not very related to the prompt.  
We saw in class that even when trained with conditionning, the model can still generate images that are not related to the prompt.  
To force the model to generate images that are related to the prompt, we can use the classifier-free guidance technique.  
This technique consists in generating two images with the same seed, one with the prompt and one without prompt or with a negative prompt and then go a bit more in the direction of the prompted predicted $x_{t-1}$.  
Try adding a negative prompt and set a high guidance scale (like 8) to see if it helps.

In [None]:
generator = torch.Generator(device=device).manual_seed(42)

pipe_output = pipe(
    prompt="a photo of duck with sunglasses riding doing surf at a tropical beach",
    negative_prompt="Oversaturated, blurry, low quality",
    height=512, width=512,
    guidance_scale=8,
    num_inference_steps=35,
    generator=generator,
)

pipe_output.images[0]

Way better no? Try to play a little bit with the different parameters to get intuitions of their importance.

In [None]:
seed = 42 #@param
generator = torch.Generator(device=device).manual_seed(42)

pipe_output = pipe(
    prompt="a photo of duck with sunglasses riding doing surf at a tropical beach", #@param
    negative_prompt="Oversaturated, blurry, low quality",  #@param
    height=512, width=512,
    guidance_scale=8,          #@param
    num_inference_steps=35,    #@param
    generator=generator,
)

pipe_output.images[0]

We saw in class that stable diffusion was a latent diffusion model.  
This means that the model first project the image to a latent space and then generate the image in this latent space.  
Let's now dive a bit in the different elements composing our pipeline.  
The pipeline is composed of:
*    a VAE
*    a text encoder and a tokenizer
*    a U-Net
*    a scheduler


Let's start with the VAE and have a look at the architecture.

In [None]:
vae = pipe.vae
vae

This vae is quite special and does not compress the information that much.  
Images width and height are divided by 8 and an extra dimension is added to the end of the tensor.  
Let's see how it works and how much information is lost.

In [None]:
!wget https://i5.cloudfable.net/styles/735x735/119.111/White/quelquun-en-californie-maime-palm-island-beach-t-shirt-a-manches-longues-20241118021552-q2v1webg-s6.jpg -O image.jpg

In [None]:
from PIL import Image
input_image = Image.open('image.jpg').resize((512, 512))
input_image

We will first convert the image to a tensor add a batch dimension and send it to the device.  
Then we will scale it to the range [-1, 1].

In [None]:
import torchvision.transforms as transforms
transform = transforms.ToTensor()
tensor_image = transform(input_image)
# we add a batch dimension, send it to device in bf16 and scale it
tensor_image = torch.unsqueeze(tensor_image, 0).to(device=device, dtype=torch.float16)
tensor_image = tensor_image * 2 - 1

We can now compute the image latent.

In [None]:
with torch.no_grad():
  latent = vae.encode(tensor_image).latent_dist.sample()
  
fig, axs = plt.subplots(1, 4, figsize=(16, 4))
for c in range(4):
    axs[c].imshow(latent[0][c].cpu(), cmap='Greys')

You may see that even in the latent space, the image is still recognizable.  
Let's now decode it back to the image space.

In [None]:
with torch.no_grad():
  reconstruction = vae.decode(latent).sample
  reconstruction = (reconstruction / 2 + 0.5).clamp(0, 1)
plt.figure(figsize=(12, 12))
plt.imshow(reconstruction[0].permute(1, 2, 0).to('cpu', dtype=torch.float32))
plt.axis("off")

You should see that the image is almost identical to the original one.  
But if you look closely, you may see that there are some artifacts.  
For instance the text on the t-shirt is not exactly the same.  
This is because the VAE is not perfect and some information is lost in the process.  
Let's now look at the parts responsible for the text.  
Our model can be prompted with a text to generate text embeddings.  
To do so, we first need it first uses a tokenizer to convert the text to tokens.  
Then it uses the text encoder to convert the tokens to embeddings.  
Let's see how it works.

In [None]:
tokenizer = pipe.tokenizer
text_encoder = pipe.text_encoder
print(text_encoder)

We can see that this model uses a CLIP text encoder.  
CLIP stands for Contrastive Language-Image Pretraining.  It is a model that was trained on a large amount of text-image pairs and is able to project text and images to a joint embedding space.  
Let's see how all this works, firts by tokenizing the prompt.

In [None]:
prompt = "a photo of duck with sunglasses riding doing surf at a tropical beach"
indexes = tokenizer([prompt])['input_ids']
for idx in indexes:
  print(f"{idx} -> {pipe.tokenizer.decode(idx)}")

The prompt is converted to a sequence of tokens.  
Then we use the text encoder to convert the tokens to embeddings.  

In [None]:
text_embeddings = pipe.encode_prompt(prompt, device, 1, False, '')[0]
text_embeddings.shape

The text encoder outputs a sequence of 77 tokens embeddings of size 1024.  
Try with another prompt, you should see the exact same output shape.  

Now let's have a look at the U-Net.  
The U-Net is a convolutional neural network that is used to generate the image.  
It is a modified version of the U-Net architecture that was originally proposed for medical image segmentation.  
It is composed of a contracting path and an expanding path.  
The contracting path is used to extract features from the image and the expanding path is used to generate the image.  
You may also observe that it contains self-attention layers and cross-attention layers. The cross-attention layers are used here to attend to the text embeddings and condition the image generation process on the text.  

In [None]:
pipe.unet

We will now look at the current scheduler.  
Diffusion models are often trained with a classical scheduler like the DDPM scheduler we saw in class.  
Yet at inference time, we often use a different scheduler to sample faster.  
Let's look at the current scheduler.  

In [None]:
scheduler = pipe.scheduler
print(scheduler)

We can see that it is a PNDMScheduler.  
The PNDM scheduler is a variant of the DDPM scheduler that is designed to sample faster.  
It is a good choice for fast sampling.  
Let's look at the number of inference steps, the timesteps and the alphas_cumprod.      

In [None]:
print(scheduler.num_inference_steps)
print(scheduler.timesteps)

In [None]:
print(scheduler.alphas_cumprod[scheduler.timesteps.cpu()])

In [None]:
plt.plot(scheduler.alphas_cumprod)

We can use this scheduler to generate a noisy image.  
A small detail to pay attention here is that stable diffusion normalize the latents by a factor of 0.18215.  
This is to ensure that the latents have a consistent scale and that the model is able to learn effectively.  
Thus we need to scale before adding noise and then scale back after when decoding.

In [None]:
noise = torch.randn_like(latent) # Random noise
sampling_step = 25
noisy_latent = scheduler.add_noise(0.18215 *latent, noise, timesteps=torch.tensor([scheduler.timesteps[sampling_step]])) / 0.18215
with torch.no_grad():
  reconstruction = vae.decode(noisy_latent).sample
  reconstruction = (reconstruction / 2 + 0.5).clamp(0, 1)
plt.figure(figsize=(12, 12))
plt.imshow(reconstruction[0].permute(1, 2, 0).to('cpu', dtype=torch.float32))
plt.axis("off")

Try it for different levels of noise.  

The following function will animate the forward diffusion process.

In [None]:
from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
from IPython.display import HTML
from functools import partial
from typing import List

noisy_images = []
for sampling_step in range(scheduler.num_inference_steps)[::-1]:
  noise = torch.randn_like(latent)
  noisy_latent = scheduler.add_noise(0.18215 *latent, noise, timesteps=torch.tensor([scheduler.timesteps[sampling_step]])) / 0.18215
  with torch.no_grad():
    reconstruction = vae.decode(noisy_latent).sample
    reconstruction = (reconstruction / 2 + 0.5).clamp(0, 1)[0].permute(1, 2, 0).to('cpu', dtype=torch.float32)
    noisy_images.append(reconstruction)

fig, ax = plt.subplots(figsize=(6, 6))

def animate(i:int, series:List[torch.Tensor]):
    ax.clear()
    ax.imshow(series[i])
    ax.set_axis_off()

animate_forward = partial(animate, series=noisy_images)

anim = FuncAnimation(fig, animate_forward, frames=len(noisy_images),
                    interval=250)  # 500ms between frames

HTML(anim.to_jshtml())

It is possible to use other scheduler as long as they are initialized with the same configuration.  
Let's try the EulerDiscreteScheduler.    



In [None]:
from diffusers import EulerDiscreteScheduler
pipe.scheduler = EulerDiscreteScheduler.from_config(pipe.scheduler.config)
pipe_output = pipe(
    prompt="a photo of duck with sunglasses riding doing surf at a tropical beach",
    negative_prompt="Oversaturated, blurry, low quality",
    height=512, width=512,
    guidance_scale=8,
    num_inference_steps=50,
    generator=generator,
)

pipe_output.images[0]

We will now write a function to generate an image.  
Even if the pipeline directly provide a method to generate an image, it is good practice to write your own function.  
This will help you understand how the pipeline works and will allow you to customize it more easily.  
Complete the following function and try to generate an image.  

In [None]:
def generate_image(
    pipe,
    prompt,
    num_steps=30,
    seed=42,
    device="cuda"
):
    generator = torch.Generator(device=device).manual_seed(seed)

    # Get text embeddings
    embeddings = pipe.encode_prompt(prompt, device, 1, False, '')[0]

    # Initialize random latents (shape 1,4,64,64)
    latents = torch.randn(
        (1, 4, 64, 64),
        device=device,
        dtype=torch.float16,
        generator=generator
    ) * pipe.scheduler.init_noise_sigma

    # Setup diffusion timesteps
    pipe.scheduler.set_timesteps(num_steps, device=device)

    # Denoising loop
    for t in pipe.scheduler.timesteps:
        # Scale input according to scheduler
        model_input = pipe.scheduler.scale_model_input(latents, t)

        # Get model prediction
        with torch.no_grad():
            pred = pipe.unet(model_input, t, encoder_hidden_states=embeddings).sample

        # Denoise step
        latents = pipe.scheduler.step(pred, t, latents).prev_sample

    # Convert latents to image
    with torch.no_grad():
        image = pipe.decode_latents(latents.detach())

    return pipe.numpy_to_pil(image)[0]

gen_img = generate_image(pipe, prompt)
gen_img

Arf! We may have obtained the same problem as with our firts generation.  
Indeed the current sampling loop does not use classifier free guidance.  
Let's try to fix it by using classifier free guidance.  
To implement classifier free guidance, we need to generate two sets of predicted noise: one for the unconditional prompt and one for the conditional prompt.  Then we can compute the guidance as the difference between the noise predicted for the conditional prompt and the noise predicted for the unconditional prompt.
Then we will do one step of denoising using the unconditional noise to which we add the guidance weighted by the guidance scale.
This can be resumed by the following formula:
$$
\text{noise\_pred\_uncond} + \text{guidance\_scale} \times (\text{noise\_pred\_text} - \text{noise\_pred\_uncond})
$$
Here we will use a negative prompte instead of an unconditional prompt to compute the guidance.
Complete the following function and try to generate an image.

In [None]:
def generate_image(
    pipe,
    prompt,
    negative_prompt="",
    guidance_scale=7.5,
    num_steps=30,
    seed=42,
    device="cuda"
):
    generator = torch.Generator(device=device).manual_seed(seed)

    # Get text embeddings for both conditional and unconditional
    uncond_embeddings = pipe.encode_prompt(negative_prompt, device, 1, False, '')[0]
    text_embeddings = pipe.encode_prompt(prompt, device, 1, False, '')[0]

    # Concatenate the embeddings
    embeddings = torch.cat([uncond_embeddings, text_embeddings])

    # Initialize random latents (shape 1,4,64,64)
    latents = torch.randn(
        (1, 4, 64, 64),
        device=device,
        dtype=torch.float16,
        generator=generator
    ) * pipe.scheduler.init_noise_sigma

    # Setup diffusion timesteps
    pipe.scheduler.set_timesteps(num_steps, device=device)

    # Denoising loop
    for t in pipe.scheduler.timesteps:
        # Double the latents for conditional and unconditional
        latent_model_input = torch.cat([latents] * 2)

        # Scale input according to scheduler
        latent_model_input = pipe.scheduler.scale_model_input(latent_model_input, t)

        # Get model prediction
        with torch.no_grad():
            noise_pred = pipe.unet(latent_model_input, t, encoder_hidden_states=embeddings).sample

        # Perform guidance
        noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
        noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

        # Denoise step
        latents = pipe.scheduler.step(noise_pred, t, latents).prev_sample

    # Convert latents to image
    with torch.no_grad():
        image = pipe.decode_latents(latents.detach())

    return pipe.numpy_to_pil(image)[0]

# Example usage
gen_img = generate_image(
    pipe,
    prompt,
    negative_prompt="low quality, blurry",
    guidance_scale=7.5
)
gen_img

In the previous notebook we used a DDIM scheduler to generate an image faster.  
DDIM is a scheduler that can be used to generate an image faster than the DDPM scheduler.  
A key property of DDIM is that it is deterministic, meaning that the same input will always produce the same output.  
This is in contrast to DDPM, which is stochastic and produces different outputs for the same input.  
Since it generates deterministic trajectories, we can also use DDIM to compute 'inversion'.
Inversion is the process of finding the latent that will produce the specified image when decoded.  

Disclaimer: the following of this notebook is more than largely inspired from the excelent notebooks from [Jonathan Whitaker](https://johnowhitaker.dev/). I really encourage you to look at all of his tutorials if you want to learn more about diffusion models and the diffusers library.

The following functions will allow for computing the inversion of an image.  It will generate a sequence of latents that would lead to a noise for which sampling with DDPM should lead to an image close from the original one.

In [None]:
from tqdm.notebook import tqdm

from diffusers import DDIMScheduler
pipe.scheduler = DDIMScheduler.from_config(pipe.scheduler.config)

## Inversion
@torch.no_grad()
def invert(start_latents, prompt, guidance_scale=3.5, num_inference_steps=80,
           num_images_per_prompt=1, do_classifier_free_guidance=True,
           negative_prompt='', device=device):

    # Encode prompt
    text_embeddings = pipe._encode_prompt(
            prompt, device, num_images_per_prompt, do_classifier_free_guidance, negative_prompt
    )

    # Latents are now the specified start latents
    latents = start_latents.clone()

    # We'll keep a list of the inverted latents as the process goes on
    intermediate_latents = []

    # Set num inference steps
    pipe.scheduler.set_timesteps(num_inference_steps, device=device)

    # Reversed timesteps <<<<<<<<<<<<<<<<<<<<
    timesteps = reversed(pipe.scheduler.timesteps)

    for i in tqdm(range(1, num_inference_steps), total=num_inference_steps-1):

        # We'll skip the final iteration
        if i >= num_inference_steps - 1: continue

        t = timesteps[i]

        # Expand the latents if we are doing classifier free guidance
        latent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latents
        latent_model_input = pipe.scheduler.scale_model_input(latent_model_input, t)

        # Predict the noise residual
        noise_pred = pipe.unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample

        # Perform guidance
        if do_classifier_free_guidance:
            noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
            noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

        current_t = max(0, t.item() - (1000//num_inference_steps)) #t
        next_t = t # min(999, t.item() + (1000//num_inference_steps)) # t+1
        alpha_t = pipe.scheduler.alphas_cumprod[current_t]
        alpha_t_next = pipe.scheduler.alphas_cumprod[next_t]

        # Inverted update step (re-arranging the update step to get x(t) (new latents) as a function of x(t-1) (current latents)
        latents = (latents - (1-alpha_t).sqrt()*noise_pred)*(alpha_t_next.sqrt()/alpha_t.sqrt()) + (1-alpha_t_next).sqrt()*noise_pred


        # Store
        intermediate_latents.append(latents)

    return torch.cat(intermediate_latents)

# Sample function (regular DDIM)
@torch.no_grad()
def sample(prompt, start_step=0, start_latents=None,
           guidance_scale=8, num_inference_steps=30,
           num_images_per_prompt=1, do_classifier_free_guidance=True,
           negative_prompt='', device=device):

    # Encode prompt
    text_embeddings = pipe._encode_prompt(
            prompt, device, num_images_per_prompt, do_classifier_free_guidance, negative_prompt
    )

    # Set num inference steps
    pipe.scheduler.set_timesteps(num_inference_steps, device=device)

    # Create a random starting point if we don't have one already
    if start_latents is None:
        start_latents = torch.randn(1, 4, 64, 64, device=device, dtype=torch.float16)
        start_latents *= pipe.scheduler.init_noise_sigma

    latents = start_latents.clone()

    for i in tqdm(range(start_step, num_inference_steps)):

        t = pipe.scheduler.timesteps[i]

        # Expand the latents if we are doing classifier free guidance
        latent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latents
        latent_model_input = pipe.scheduler.scale_model_input(latent_model_input, t)

        # Predict the noise residual
        noise_pred = pipe.unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample

        # Perform guidance
        if do_classifier_free_guidance:
            noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
            noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)


        # Normally we'd rely on the scheduler to handle the update step:
        # latents = pipe.scheduler.step(noise_pred, t, latents).prev_sample

        # Instead, let's do it ourselves:
        prev_t = max(1, t.item() - (1000//num_inference_steps)) # t-1
        alpha_t = pipe.scheduler.alphas_cumprod[t.item()]
        alpha_t_prev = pipe.scheduler.alphas_cumprod[prev_t]
        predicted_x0 = (latents - (1-alpha_t).sqrt()*noise_pred) / alpha_t.sqrt()
        direction_pointing_to_xt = (1-alpha_t_prev).sqrt()*noise_pred
        latents = alpha_t_prev.sqrt()*predicted_x0 + direction_pointing_to_xt

    # Post-processing
    images = pipe.decode_latents(latents)
    images = pipe.numpy_to_pil(images)

    return images

Let's try to invert and regenerate an image.  

In [None]:
!wget https://t3.ftcdn.net/jpg/06/15/27/22/360_F_615272248_dBtUrfUopzKlJuWrFYfgY9gK3R5GCcH4.jpg -O image.jpg
input_image = Image.open('image.jpg').resize((512, 512))
input_image

Firts let's transform the image into a tensor and comute its latent.

In [None]:
tensor_image = transform(input_image)
# we add a batch dimension, send it to device in bf16 and scale it
tensor_image = torch.unsqueeze(tensor_image, 0).to(device=device, dtype=torch.float16)
tensor_image = tensor_image * 2 - 1
with torch.no_grad():
  latent = vae.encode(tensor_image).latent_dist.sample() * 0.18215

Now let's invert the latent and generate a new image from it.

In [None]:
prompt="a duck with sunglasses at the beach"
inverted_latents = invert(latent, prompt, num_inference_steps=50)
inverted_latents.shape

In [None]:
pipe(prompt, latents=inverted_latents[-1][None], num_inference_steps=50, guidance_scale=3.5).images[0]

We can see some similarities but it is quite note the same neither.  
Our inversion is not perfect.  
Let's try by going a bit less deep in the inversion process.  

In [None]:
start_step = 20

sample(prompt, start_latents=inverted_latents[-(start_step+1)][None],
       start_step=start_step, num_inference_steps=50)[0]

Way better right?  
But why is inversion a cool technique?  Inversion allows fine grained image edition.  
Indeed by inverting a little the original image and then regenerating using a modified prompt we are able to modify specific elements of the image.  
Let's change this duck into a chicken.

In [None]:
start_step = 10
new_prompt = prompt.replace('duck', 'chicken')
sample(new_prompt, start_latents=inverted_latents[-(start_step+1)][None],
       start_step=start_step, num_inference_steps=50)[0]

Pretty cool right?  Try to generate other objects with different inversion levels and try it with another image.

Diffusion models can also be conditioned with other modalities than text.  
For instance it is possible to use another image to condition our model.  
Let's try to generate an image from an image, meaning that we will try to catch some specific features from the original image.  

In [None]:
from diffusers import StableDiffusionImg2ImgPipeline

In [None]:
pipe.scheduler = EulerDiscreteScheduler.from_config(pipe.scheduler.config)
img2img = StableDiffusionImg2ImgPipeline(**pipe.components)

We will generate a new image that should have the same comon semantic features than the one with our duck at the beach.

In [None]:
result_image = img2img(
    prompt="A baby at the beach",
    image=input_image, # The starting image
    strength=0.6, # 0 for no change, 1.0 for max strength
).images[0]

# View the result
fig, axs = plt.subplots(1, 2, figsize=(12, 5))
axs[0].imshow(input_image);axs[0].set_title('Input Image')
plt.axes("off")
axs[1].imshow(result_image);axs[1].set_title('Result');
plt.axis("off")

Diffusion models can also be used for image inpainting.  
If we are able to provide an image and its mask, we can then ask a diffusion model to inpaint the masked part.

In [None]:
!wget https://raw.githubusercontent.com/CompVis/latent-diffusion/main/data/inpainting_examples/overture-creations-5sI6fQgYIuo.png -O dog.png
!wget https://raw.githubusercontent.com/CompVis/latent-diffusion/main/data/inpainting_examples/overture-creations-5sI6fQgYIuo_mask.png -O mask.png
original_img = Image.open('original.jpg').resize((512, 512))
mask_img = Image.open('latent_segmentation_mask.png').resize((512, 512))

In [None]:
inpaint = StableDiffusionInpaintPipeline.from_pretrained("runwayml/stable-diffusion-inpainting")
inpaint = pipe.to(device)

prompt = "A robot, walking"
image = inpaint(prompt=prompt, image=original_img, mask_image=mask_img).images[0]

# View the result
fig, axs = plt.subplots(1, 3, figsize=(16, 5))
axs[0].imshow(original_img);axs[0].set_title('Input Image'); plt.axis("off")
axs[1].imshow(mask_img);axs[1].set_title('Mask'); plt.axis("off")
axs[2].imshow(image);axs[2].set_title('Result'); plt.axis("off")


That's it for this notebook.  
Now if you still have time try to implement the diffusion training on mnist with the last notebook of the session.