# DreamBooth

This Notebook walks you through the process of implementing DreamBooth, using Stable diffusion. [DreamBooth](https://dreambooth.github.io/) is a pioneering project from Google, designed with the intent of personalizing the outcomes generated by large text-to-image models. As an example, you can refine the generative model utilizing a handful of your dog's pictures and subsequently command it to generate innovative images wherein your dog is the central character.

<figure>
  <img src="images/dreambooth.png" alt="dreambooth" style="width:100%">
    <figcaption>
      Image take from <a href="(https://dreambooth.github.io/">DreamBooth: Fine Tuning Text-to-Image Diffusion Models for Subject-Driven Generation</a>
    </figcaption>
</figure>

The original DreamBooth project uses the [Imagen](https://imagen.research.google/) text-to-image model. However, given that Imagen is not an open-source model, the wider community has pivoted towards using Stable Diffusion to develop applications akin to DreamBooth. To this end, this tutorial is designed to guide you in creating your own version of DreamBooth, utilizing Stable Diffusion.

Let's import the libraries we need to start the process:

In [None]:
import os
import math
import random

from base64 import b64encode
from functools import partial
from argparse import Namespace

import torch
import torch.nn.functional as F
import numpy as np
import bitsandbytes as bnb
import matplotlib.pyplot as plt

from PIL import Image
from tqdm import tqdm
from IPython.display import HTML
from huggingface_hub import notebook_login
from diffusers import LMSDiscreteScheduler
from diffusers import StableDiffusionPipeline

The subsequent code cell establishes the parameters for the training process. For this instance, you will be working with images of size 512 x 512 x 3. This dimension necessitates considerable GPU memory, and to moderate the memory usage, you have to:

1. Maintain the training batch size at 1, thereby loading only a single image in the memory at any given moment.
1. Configure gradient accumulation to 2, updating the model weights after every two training steps, effectively emulating a training process with a batch size of 2.
1. Utilize mixed-precision training (for more info about this technique, see [here](https://pytorch.org/blog/what-every-user-should-know-about-mixed-precision-training-in-pytorch/)).

In addition, you delineate the text prompt you will be using to describe the novel concept you aim to teach to your model. Typically, you should search for an infrequently used token and link your concept to it. We will be talking about tokens more down the line. The token `sks` is used quite often for this purpose. Choosing this uncommon token will not impair the model's generalization capacity. Hence, you can employ the prompt `A photo of sks <class>`. The `<class>` token is contingent upon the nature of your concept. For instance, if you are teaching the model to understand a human face, the class can be represented by the term `person`. Consequently, the prompt transforms to `A photo of sks person`. Our model will discard any previous meaning associated with the token `sks`, but this is not a concern for our purposes.

Finally, you also set the repository where the pre-trained Stable Diffusion model lives. The model is hosted on Hugging Face Hub. To be able to download it, you should:

1. Create an account with [Hugging Face](https://huggingface.co/).
1. Go to the [model card](https://huggingface.co/CompVis/stable-diffusion-v1-4) and accept the terms.
1. Go to your settings and create an access token with `read` permissions (if you want to upload and store your fine-tuned model on Hugging Face Hub, create an access token with `write` permissions instead).
1. Uncomment and run the following cell to log in with your access token.

In [None]:
# import os; os.environ["HTTPS_PROXY"] = "http://hpeproxy.its.hpecorp.net:80"
# notebook_login()

In [None]:
IMG_SIZE = 512
LEARNING_RATE = 6e-06
MAX_TRAINING_STEPS = 450
TRAIN_BATCH_SIZE = 1
GRADIENT_ACCUMULATION = 2
MAX_GRAD_NORM = 1.0
INFERENCE_STEPS = 25
GUIDANCE_SCALE = 7.5
REVISION = "fp16"
OUTPUT_DIR = "dreambooth-concept"
MODEL_NAME = "stabilityai/stable-diffusion-2-1"
CONCEPT_PROMPT = "A portrait of sks person"
CONCEPT_DATA_PATH = "/home/dimpo/ezua-tutorials/E2E-Demos/DreamBooth/concept"

In [None]:
args = Namespace(
    model_name=MODEL_NAME,
    img_size=IMG_SIZE,
    concept_data_path=CONCEPT_DATA_PATH,
    concept_prompt=CONCEPT_PROMPT,
    learning_rate=LEARNING_RATE,
    max_train_steps=MAX_TRAINING_STEPS,
    train_batch_size=TRAIN_BATCH_SIZE,
    gradient_accumulation = GRADIENT_ACCUMULATION,
    max_grad_norm=MAX_GRAD_NORM,
    revision=REVISION,
    gradient_checkpointing=True,
    eight_bit_optimizer=True,
    output_dir=OUTPUT_DIR,
    seed=42,
    save_concept=False
)

## The CLIP Tokenizer

We are now ready to delve into the functionality of the Stable Diffusion model. Comprised of three components, Stable Diffusion includes:

1. A U-Net model
1. A CLIP encoder model
1. A Variational Autoencoder model

Let's take things one step at a time and explore the [CLIP](https://openai.com/blog/clip/) encoder first. Stable Diffusion employs a CLIP encoder to convert the words in our prompt into semantically meaningful vectors. We'll divide this encoding process into two stages: i) the tokenization stage and ii) the encoding stage.

The preliminary part of the process is termed tokenization. The CLIP tokenizer functions as a dictionary that associates a word with a specific integer ID. Let's examine how this operates with our given prompt:

In [None]:
from transformers import CLIPTokenizer

def get_clip_tokenizer(model_name: str, revision: str) -> CLIPTokenizer:
    """Get the CLIP tokenizer.

    Args:
        model_name (str): The name of the pretrained model to load.
        revision (str): The revision of the model to load.

    Returns:
        CLIPTokenizer: The CLIP tokenizer.
    """
    tokenizer = CLIPTokenizer.from_pretrained(
        model_name, subfolder="tokenizer", revision=revision)
    return tokenizer

In [None]:
# load the CLIP tokenizer
clip_tokenizer = get_clip_tokenizer(args.model_name, args.revision)
# return the tokens as PyTorch tensors
tokens = clip_tokenizer(CONCEPT_PROMPT, return_tensors="pt")

In [None]:
print(f"Concept prompt: '{CONCEPT_PROMPT}'")
print(f"Tokens: {tokens}")

You see that the tokenizer turned the prompt "A photo of sks person" into a sequence of integers:

`49406, 320, 5352, 539, 48136, 2533, 49407`

You observe that we have `7` tokens for `5` words. Why is that? Furthermore, what is this `attention_mask` key you see there? First, let's convert each ID back to words, so you can examine to what word each of the tokens correspond:

In [None]:
for token in tokens["input_ids"][0]:
    print(f"Token ID: {token}\t Word: {clip_tokenizer.decoder.get(int(token))}")

We see that the CLIP tokenizer reserves IDs `49406` and `49407` to indicate the start and end of text sequences. We also see that each word has a special end-of-word suffix (`</w>`). This special suffix is used in some tokenization methods as a way to indicate the end of a word.

Now, what is this `attention_mask` list? This mask is a list of integers that the CLIP encoder uses down the road. It informs the model about the words in the sequence it needs to attend to. So, if we had used padding to change the length of this sequence, the `attention_mask` would have zeros in the padding positions to inform the model that it does not have to consider the padding tokens when making predictions. Let's do that:

In [None]:
padded_tokens = clip_tokenizer(CONCEPT_PROMPT, padding="max_length",
                               max_length=clip_tokenizer.model_max_length,
                               return_tensors="pt")

print(f"Max sequence lenght the tokenizer can handle: {clip_tokenizer.model_max_length}")
print(f"Concept prompt: '{CONCEPT_PROMPT}'")
print(f"Tokens: {padded_tokens}")

The tokenizer is capable of managing sequences that extend up to a maximum of 77 tokens. In the event that padding is required, it adds a long sequence of `<|endoftext|>` tokens. Conversely, if the sentence surpasses 77 words in length, it truncates the sentence. Finally, the attention mask inserts a continuous sequence of zeros at the positions designated for padding.

Now, we have a way to convert our words into numbers. Let's move to the encoding part.

## The CLIP encoder

Stable Diffusion utilizes CLIP to map our text prompts into a multidimensional space. Simply put, it transforms each word in our prompt into vectors, infusing them with meaning for the model. These vectors are referred to as "embeddings" and we separately train a CLIP model to generate them. For further insight on this procedure, you may read through this [blog](https://openai.com/blog/clip/) by OpenAI.

For our application, it is unnecessary to re-train or fine-tune this component during DreamBooth's training process, as it is already proficient in translating each word in our text prompt into substantial vectors. Let's observe this operation in action.

In [None]:
from transformers import CLIPTextModel

def get_clip_encoder(model_name: str, revision: str) -> CLIPTextModel:
    """Get the CLIP encoder model.

    Args:
        model_name (str): The name of the pretrained model to load.
        revision (str): The revision of the model to load.

    Returns:
        CLIPTextModel: The CLIP encoder model.
    """
    text_encoder = CLIPTextModel.from_pretrained(
        model_name, subfolder="text_encoder", revision=revision)
    return text_encoder

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# load the CLIP encoder model
clip_encoder = get_clip_encoder(args.model_name, args.revision).to(device)
# embed the text prompt token ids
with torch.no_grad():
    text_embeddings = clip_encoder(padded_tokens.input_ids.to(device))[0]

print(f"Embedding shape: {text_embeddings.shape}")
print(f"Embedding of the word 'photo' (first five elements): {text_embeddings[0][2][:5]}")

As observed, we processed our text prompt through the CLIP tokenizer, which transformed it into a sequence of `7` digits. Subsequently, these digits were routed through the CLIP encoder, morphing them into seven vectors, each of `1024` dimensions. These vectors serve as the input for our model.

# The Variational Autoencoder

Stable Diffusion is a Latent Diffusion Model (LDM). To understand how it works, let's first understand how Diffusion Models learn to generate images. The following steps describe the training process of a diffusion model. Given an image from a training set, a Diffusion Model:

1. Adds noise to the image iteratively, until the image information is lost (this is called the forward diffusion process)
1. Learns the reverse process: take the noisy image and try to reconstruct the original one

<figure>
  <img src="images/diffusion.png" alt="diffusion" style="width:100%">
    <figcaption>
      image taken from: Ho, J., Jain, A. and Abbeel, P., 2020. Denoising diffusion probabilistic models. Advances in Neural Information Processing Systems, 33, pp.6840-6851.
    </figcaption>
</figure>

Once the model grasps the inverse procedure (the denoising process), we can introduce random noise as input, enabling it to generate realistic-looking images. Subsequently, we can direct the thematic concept of the generated image using a textual prompt.

The complication arises from the fact that the model operates within the pixel space, and processing `512 x 512 x 3` images can render the process rather sluggish. How can we improve this? We can compress the image into a latent (or hidden) representation, retain the properties that define the image, and carry out the training process within this latent space. This space is termed latent (or hidden) because the meaning of its dimensions remains unknown to us. We treat it as a black box; we inform the model of a space encompassing (for example) `16000` dimensions and grant it the liberty to manipulate it as desired.

The Variational Autoencoder (VAE) does exactly this: compresses the image into a latent space, but keeps the characterists that makes the image what it is. That's why it can later decompress the latent back into the original image with great accuracy. You can think of a VAE as another lossy compression algorithm.

Stable Diffusion uses a VAE to compress the original image into a latent space. That's why Stable Diffusion falls into the category of LDMs. Usually, the encoder component of the autoencoder compresses a `512 x 512 x 3` image to `64 x 64 x 4`, which is a `48x` reduction. Then, the decoder knows how to take a `64 x 64 x 4` latent representation and reconstruct the original image. Working on the latent representation of the image during the fowrward diffusion and denoising steps makes things an order of magnitude faster.

We do not need to re-train or fine-tune this component during the training process of DreamBooth, as it already knows how to compress an image to a latent space effectively. However, let's see the autoencoder in action. First, let's load the image that contains the concept we want to teach our model:

In [None]:
image = Image.open(os.path.join("images", "dog.png"))
image

Let's convert the image to a PyTorch tensor and pass it through the autoencoder:

In [None]:
from diffusers import AutoencoderKL

def get_vae(model_name: str, revision: str) -> AutoencoderKL:
    """Get the VAE model.

    Args:
        model_name (str): The name of the pretrained model to load.
        revision (str): The revision of the model to load.

    Returns:
        AutoencoderKL: The VAE model.
    """
    vae = AutoencoderKL.from_pretrained(
        model_name, subfolder="vae", revision=revision)
    return vae

In [None]:
import torchvision.transforms as transforms

# convert image to a PyTorch tensor
transform_func = transforms.Compose([transforms.ToTensor()])
tensor_image = transform_func(image)

# encode the image with the VAE encoder
vae = get_vae(args.model_name, args.revision).to(device)
with torch.no_grad():
    latent = vae.encode(tensor_image.unsqueeze(0).to(device) * 2 - 1)
    latent = 0.18215 * latent.latent_dist.sample()

print(f"Latent shape: {latent.shape}")

The autoencoder compressed the `512 x 512 x 3` image to a `64 x 64 x 4` latent representation. The unit dimension at the beginning indicates tha batch size. Here we only have a single image hence `1 x 64 x 64 x 4`. Let's visualize the four channels of the latent:

In [None]:
_, ax = plt.subplots(1, 4, figsize=(16, 4))

for c in range(4):  # display each channel separately
    ax[c].imshow(latent.cpu()[0][c], cmap='Greys')

Since the latents are in a matrix form, we can visualize them as images. It seems like the encoder downsampled the initial image quite a bit, however, even if they do look like images we cannot be sure what are the characteristics of the original image that each pixel or channel encodes. The original image is not embedded into a latent space.

Finally, let's decompress the image and get back the uncompressed one:

In [None]:
latent = (1 / 0.18215) * latent

# decompress the image with the VAE decoder
with torch.no_grad():
    tensor_image = vae.decode(latent).sample

tensor_image = (tensor_image / 2 + 0.5).clamp(0, 1)
tensor_image = tensor_image.detach().permute(0, 2, 3, 1).squeeze().cpu().numpy()

tensor_image = (tensor_image * 255).round().astype("uint8")
reconstruction = Image.fromarray(tensor_image)
reconstruction

Even if you squint you can't really tell how the reconstructed image differs from the original one! So let's compare the original image with the reconstruction side by side:

In [None]:
def image_grid(imgs, rows, cols):
    """Create a grid of images"""
    width, height = imgs[0].size
    grid = Image.new('RGB', size=(cols * width, rows * height))

    for i, img in enumerate(imgs):
        grid.paste(img, box=(i % cols * width, i // cols * height))

    return grid

In [None]:
from PIL import ImageChops

diff = ImageChops.difference(image, reconstruction)
image_grid([image, reconstruction, diff], rows=1, cols=3)

As observed, the difference is negligible.

## The U-Net

The final piece of Stable Diffusion is the U-Net model. The goal of the U-Net is to learn the reverse function of the forward diffusion process. It does this by predicting the amount of noise we added to the latent representation of the image at each step of the forward diffusion process. Then, we can subtract the predicted noise from the noisy latent to get the original one.

<figure>
  <img src="images/u-net.png" alt="u-net" style="width:50%">
    <figcaption>
      The U-Net architecture first proposed in Ronneberger, O., Fischer, P. and Brox, T., 2015, October. U-net: Convolutional networks for biomedical image segmentation. In International Conference on Medical image computing and computer-assisted intervention (pp. 234-241). Springer, Cham.
    </figcaption>
</figure>

However, instead of performing the denoising operation in one step, we do this iteratively, removing a small amount of noise at each step. This tends to produce more accurate results, as the model does not have a lot to work with when starting from random noise. Later in the Notebook we'll see how that process works, but first, we need a dataset.

## Create the dataset

Our next step is to create the examples we'll use during training. An example has two parts: i) the image and ii) the text prompt. Then, we need a function to stack our examples on top of each other and create batches. That is what the `_collate_fn` is doing.

In [None]:
from pathlib import Path
from torch.utils.data import Dataset

class DreamBoothDataset(Dataset):
    """The DreamBooth dataset.
    
    This dataset loads the concept images from the concept data path and
    creates training examples by combining the concept images with the
    concept prompt.

    Args:
        concept_data_path (str): The path to the concept images.
        concept_prompt (str): The concept prompt to use.
        tokenizer (str): The CLIP tokenizer.
        img_size (int, optional): The image size to use. Defaults to 512.
    
    Attributes:
        transforms (torchvision.transforms.Compose): The image transformation
            composition.
    """
    def __init__(self, concept_data_path: str, concept_prompt: str,
                 tokenizer: str, img_size: int = 512):
        self.tokenizer = tokenizer

        self.concept_prompt = concept_prompt
        self.concept_images = [fn for fn in Path(concept_data_path).iterdir()
                               if str(fn).lower().endswith(('.jpg', '.jpeg'))]

        self.image_transforms = transforms.Compose([
                transforms.Resize(img_size, 
                    interpolation=transforms.InterpolationMode.BILINEAR),
                transforms.CenterCrop(img_size),
                transforms.ToTensor(),
                transforms.Normalize([0.5], [0.5])])

    def __len__(self):
        return len(self.concept_images)

    def __getitem__(self, index):
        example = {}

        # create the concept images part of the example
        concept_image = Image.open(
            self.concept_images[index % len(self.concept_images)])

        if not concept_image.mode == "RGB":
            concept_image = concept_image.convert("RGB")

        example["concept_images"] = self.image_transforms(concept_image)

        # create the concept prompt part of the example
        concept_prompt_tokens = self.tokenizer(
            self.concept_prompt, padding="do_not_pad", truncation=True,
            max_length=self.tokenizer.model_max_length).input_ids

        example["concept_prompt_tokens"] = concept_prompt_tokens
        
        return example

In [None]:
def _collate_fn(examples, tokenizer):
    # for each example, extract the text prompts and the images
    input_ids = [example["concept_prompt_tokens"] for example in examples]
    pixel_values = [example["concept_images"] for example in examples]

    # stack the pixel values vertically
    pixel_values = torch.stack(pixel_values)
    pixel_values = pixel_values.contiguous().float()

    # pad the text prompt tokens
    input_ids = tokenizer.pad(
        {"input_ids": input_ids}, padding=True, return_tensors="pt").input_ids

    # return a batch of padded tokens and images
    batch = {"input_ids": input_ids,
             "pixel_values": pixel_values}

    return batch

collate_fn = partial(_collate_fn, tokenizer=clip_tokenizer)

The `Dataset` defines what each example consists of. In our case, one example consists of an image and a tokenized prompt. Finally, the `DataLoader` retrieves a few examples from the dataset and passes them to the collate function to create batches. It does this in a lazy manner and in our case, we've set the batch size to `1`, so the `DataLoader` tells the collate function to stack just one image.

In [None]:
train_dataset = DreamBoothDataset(
        concept_data_path=args.concept_data_path,
        concept_prompt=args.concept_prompt,
        tokenizer=clip_tokenizer,
        img_size=args.img_size)

train_dataloader = torch.utils.data.DataLoader(
        train_dataset, batch_size=args.train_batch_size,
        shuffle=False, collate_fn=collate_fn)

Let's get one of our examples.

In [None]:
example = next(iter(train_dataloader))

Let's first examine the size of the image that the example contains:

In [None]:
# from the example tuple get the image
img = example["pixel_values"]
# the image is normalized to have values between -1 and 1
# clip it, so the min value is `0` and the max `1`
img = torch.clip(img, min=0., max=1.,)
# remove the batch dimension and move the channel dimension last
img = img.squeeze().permute(1, 2, 0).numpy()

print(f"Image size: {img.shape}")

Our data is what we expected them to be: a `512 x 512 x 3` image. Now, let's see what our image looks like:

In [None]:
# rescale image to [0...255]
img = (img * 255).round().astype("uint8")
# load and display the image
Image.fromarray(img)

The image now looks a bit different from the original image we saw earlier. This is because the Dataset introduces a series of transformations to the original image to make it easier to work with:

- Center crops it so it is a square image
- Normalizes the image so each pixel takes values in range [-1, 1]

## The Scheduler

Let's take a look now at the forward diffusion process we mentioned earlier. During this process, we iteratively add noise to the image until it becomes just random noise. But how we do that? What iteratively means?

Let's see how the graph of the scheduler looks like:

In [None]:
scheduler = LMSDiscreteScheduler(
    beta_start=0.00085, beta_end=0.012,
    beta_schedule="scaled_linear")

plt.plot(scheduler.sigmas)
plt.title("Noise Schedule")
plt.xlabel("Sampling step")
plt.ylabel("$\sigma$")
plt.show()

The scheduler is a function that accepts a sampling step and returns the standard deviation that we use to scale the noise. In plain english, it decides the amount of noise we add to the image at each sampling step. From the graph, we can see that at step `0` we add a lot of noise to the image, and at step `1000` we add just a little.

In general, the training process follows the steps below:

1. Take an image from the dataset
1. Compress the image to a latent representation
1. Sample a random integer to indicate the timestep
1. Pass the timestep to the scheduler and get the amount of noise to add to the latent
1. Add the noise to the latent (forward diffusion process)
1. Pass the noisy latent through the U-Net and get a pred

Let's see how our original image would look like if we added the amount of noise that the scheduler returns for timestep `600`:

In [None]:
# random noise
noise = torch.randn_like(latent) # Random noise
# set timestep to `600`
timestep = 600
# add noise to the image
# the scheduler scaled the random noise by sigma and adds it to the image
# noisy_image = original_image + noise * sigma
noisy_latent = scheduler.add_noise(latent, noise, timesteps=torch.tensor([scheduler.timesteps[timestep]]))

Let's visualize the result:

In [None]:
from PIL import ImageFile

def latent_to_image(latent: torch.tensor,
                    vae: torch.nn.Module = None) -> ImageFile:
    """Visualize a latent as a PIL image.
    
    Args:
        latent (torch.tensor): The latent representation.
        decoder (torch.nn.Module): A decoder module to
            decode the latent back to pixel space.
    
    Return:
        ImageFile: A PIL ImageFile object.
    """
    if vae:
        latent = (1 / 0.18215) * latent

        # decompress the image with the VAE decoder
        with torch.no_grad():
            tensor_image = vae.decode(latent).sample
    else:
        tensor_image = latent

    tensor_image = (tensor_image / 2 + 0.5).clamp(0, 1)
    tensor_image = (tensor_image.detach()
                                .permute(0, 2, 3, 1)
                                .squeeze()
                                .cpu()
                                .numpy())
    tensor_image = (tensor_image * 255).round().astype("uint8")
    return Image.fromarray(tensor_image)

In [None]:
noisy_image = latent_to_image(noisy_latent, vae)
noisy_image

## Training & Inference

Let's load the U-Net and set it to training mode. As we said before, this is the only model we need to fine tune.

In [None]:
from diffusers import UNet2DConditionModel


def get_unet(model_name: str, revision: str,
             conditioned: bool = True) -> UNet2DConditionModel:
    """Get the UNet model.

    Args:
        model_name (str): The name of the pretrained model to load.
        revision (str): The revision of the model to load.
        conditioned (bool, optional): Whether to load the conditioned UNet
            model. Defaults to True.

    Returns:
        UNet2DConditionModel: The UNet model.
    """
    return UNet2DConditionModel.from_pretrained(
        model_name, subfolder="unet", revision=revision)


In [None]:
unet = get_unet(args.model_name, args.revision).to(device)
unet = unet.train()
unet.enable_gradient_checkpointing()  # enable gradient checkpointing to reduce the GPU momory consumption

For the optimizer, we'll use an `8bit` implementation of [AdamW](https://pytorch.org/docs/stable/generated/torch.optim.AdamW.html). We use this specific implementation to reduce the memory footprint of the optimizer on the GPU. You can learn more about `8bit` optimizers by watching this [presentation](https://youtu.be/IxrlHAJtqKE) by the creator of the `bitsandbytes` library, Tim Dettmers. 

Finally, we pass only the parameters of the U-Net model, so the optimizer can only mess with these parameters.

In [None]:
optimizer = bnb.optim.AdamW8bit(unet.parameters(), lr=args.learning_rate)

Let's get an example and walk through the inference process for one step. This is just for demonstration. Later, we'll kick-off a real training process on a GPU device.

In [None]:
imgs, input_ids = example["pixel_values"], example["input_ids"]
imgs, input_ids = imgs.to(device), input_ids.to(device)

Compress the image to a latent representation:

In [None]:
with torch.no_grad():
    latent = vae.encode(imgs).latent_dist.sample()

Add noise to the latent:

In [None]:
noise = torch.randn(latent.shape).to(device)

scheduler.set_timesteps(INFERENCE_STEPS)
noisy_latent = scheduler.add_noise(latent, noise, torch.tensor([scheduler.timesteps[1]]))

Visualize the four dimensions of the noisy latent:

In [None]:
_, ax = plt.subplots(1, 4, figsize=(16, 4))
for c in range(4):  # display each channel separately
    ax[c].imshow(noisy_latent.cpu()[0][c], cmap='Greys')

At this point we can use the scheduler and the prediction of the U-Net to compute the previous latent sample. Let's put all of this in a loop, to see what our model knows about the `sks person` we use in our text prompt.

In [None]:
os.makedirs("steps", exist_ok=True)

uncond_input = clip_tokenizer(
    [""] * args.train_batch_size, padding="max_length",
    max_length=clip_tokenizer.model_max_length, return_tensors="pt"
)
with torch.no_grad():
    uncond_embeddings = clip_encoder(uncond_input.input_ids.to(device))[0]

text_embeddings_concat = torch.cat([uncond_embeddings, text_embeddings])

for i, t in enumerate(tqdm(scheduler.timesteps)):
    input_latent = torch.cat([noisy_latent] * 2)
    input_latent = scheduler.scale_model_input(input_latent, t)

    # predict the noise residual
    noise_pred = unet(input_latent, t, encoder_hidden_states=text_embeddings_concat).sample
        
    # perform guidance
    noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
    noise_pred = noise_pred_uncond + GUIDANCE_SCALE * (noise_pred_text - noise_pred_uncond)

    # compute the previous noisy sample x_t -> x_t-1
    noisy_latent = scheduler.step(noise_pred, t, noisy_latent).prev_sample
    
    # save the result of step i
    generated_image = latent_to_image(noisy_latent, vae)
    generated_image.save(f'steps/{i:04}.jpeg')

We can now create a sequence of the generated images to observe the denoising process. You can see that the model starts with a very noisy image, where you can barely recognize the shape of a human face. It tries to reconstruct our original image but it does a bad job, since it's not yet trained to associate the work `sks` with the face we want.

In [None]:
!ffmpeg -v 1 -y -f image2 -framerate 1 -i steps/%04d.jpeg -c:v libx264 -preset slow -qp 18 -pix_fmt yuv420p out.mp4

mp4 = open('out.mp4','rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()

HTML("""
<video width=600 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url)

So, let's use the standard training procedure to update the weights of the U-Net, so it can do a bit of a better job next time. First, we need a loss function. The loss function will inform the model how off it is. Since the model predicts the noise we added to each step, we can compare this prediction with the actual noise we added.

In [None]:
loss = F.mse_loss(noise_pred, noise)
print(f"Loss: {loss:.2f}")

Next, we run the backward propagation algorithm to assign each model weight its part of the blame.

In [None]:
loss.backward()

Finally, we can use the optimizer to update the model's weights.

In [None]:
optimizer.step()

Let's now fine-tune Stable Diffusion on our custom dataset. At this point your GPU device may be running out of memory. To run the following cell sucessfully, restart the Notebook, and run the "imports" cell, as wel as the two cells that define the `args` variable.

In [None]:
from booth.train import train_dreambooth

train_dreambooth(args)

When the training process completes (it takes `~10` minutes on a Tesla P100), the booth helper library will save the trained model to a `dreambooth-concept` folder. We can load it and generate our own images. Let's see it in action:

In [None]:
# for reproducibility
torch.manual_seed(1918918)

pipe = StableDiffusionPipeline.from_pretrained("dreambooth-concept", torch_dtype=torch.float16).to("cuda")

In [None]:
prompt = "a portrait of sks man in game of thrones"
images = pipe(prompt, guidance_scale=13, num_images_per_prompt=2).images

image_grid(images, rows=1, cols=2)

Finally, if you want to store the model to Hugging Face Hub, run the cell below. It will create a new private repository called `dreambooth-frodo` under your account. You can then reference it as `<username>/dreambooth-frodo`.

> Note that you need an access token with `write` permissions for this step to work.

In [None]:
path_to_model = "dreambooth-concept"
repo_name = "dreambooth-frodo"

upload_to_huggingface(path=path_to_model,
                      name=repo_name,
                      prompt=CONCEPT_PROMPT)