In [2]:
#  The MIT License (MIT)
#
#  Copyright (c) 2015-2024 Advanced Micro Devices, Inc. All rights reserved.
#
#  Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the 'Software'), to deal
#  in the Software without restriction, including without limitation the rights
#  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#  copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
#  The above copyright notice and this permission notice shall be included in
#  all copies or substantial portions of the Software.
#
#  THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
#  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
#  THE SOFTWARE.

# Stable Diffusion 2.1

The following example will show how to run `Stable Diffusion 2.1` with `MIGraphX`.

Install the required dependencies.

In [None]:
# Install dependencies
!pip install optimum[onnxruntime] transformers diffusers accelerate

We will use optimum to generate the onnx files.

In [None]:
# export models
!optimum-cli export onnx --model stabilityai/stable-diffusion-2-1 models/sd21-onnx

Now it is time to load these models with python.

First, we make sure that MIGraphX module is found in the python path.

In [None]:
import sys
mgx_lib_path = "/opt/rocm/lib/" # or "/code/AMDMIGraphX/build/lib/"
if mgx_lib_path not in sys.path:
    sys.path.append(mgx_lib_path)
import migraphx as mgx

Next, a helper method to load and cache the models.

This will use the `models/sd21-onnx` path. If you changed it, make sure to update here as well.

In [None]:
import os
# helper for model loading
def load_mgx_model(name, shapes):
    file = f"models/sd21-onnx/{name}/model"
    print(f"Loading {name} model from {file}")
    if os.path.isfile(f"{file}.mxr"):
        print(f"Found mxr, loading it...")
        model = mgx.load(f"{file}.mxr", format="msgpack")
    elif os.path.isfile(f"{file}.onnx"):
        print(f"Parsing from onnx file...")
        model = mgx.parse_onnx(f"{file}.onnx", map_input_dims=shapes)
        model.compile(mgx.get_target("gpu"))
        print(f"Saving {name} model to mxr file...")
        mgx.save(model, f"{file}.mxr", format="msgpack")
    else:
        print(f"No {name} model found. Please verify the path is correct and re-try, or re-download model.")
        sys.exit(1)
    return model

With that, we can load the models. This could take several minutes.

In [None]:
text_encoder = load_mgx_model("text_encoder", {"input_ids": [1, 77]})

In [None]:
unet = load_mgx_model(
        "unet", {
            "sample": [1, 4, 64, 64],
            "encoder_hidden_states": [1, 77, 1024],
            "timestep": [1],
        })

In [None]:
vae = load_mgx_model("vae_decoder", {"latent_sample": [1, 4, 64, 64]})

Import the remaining packages.

In [None]:
from diffusers import EulerDiscreteScheduler
from transformers import CLIPTokenizer
import torch
import numpy as np
from tqdm.auto import tqdm
from PIL import Image

Time to load the scheduler and tokenizer from the original source.

In [None]:
model_id = "stabilityai/stable-diffusion-2-1"
scheduler = EulerDiscreteScheduler.from_pretrained(model_id,
                                                   subfolder="scheduler")
tokenizer = CLIPTokenizer.from_pretrained(model_id, subfolder="tokenizer")

Next, we will define all the steps one by one, to make the last step short and simple.

The first step will be to tokenize the user prompt. It will make a `(1, 77)` shaped `input_ids`.

In [None]:
def tokenize(input):
    return tokenizer([input],
                     padding="max_length",
                     max_length=tokenizer.model_max_length,
                     truncation=True,
                     return_tensors="np")

In [None]:
# Optional
test_tk = tokenize("test tokenizer to see the tokens")
test_tk.input_ids.shape

We run the tokenized prompt through the `Text Encoder` model. It expects the `(1, 77)` data as `int32`.

In [None]:
# Optional
text_encoder.get_parameter_shapes()

In [None]:
def get_embeddings(input):
    return np.array(
        text_encoder.run({"input_ids": input.input_ids.astype(np.int32)
                          })[0]).astype(np.float32)

In [None]:
# Optional
test_emb = get_embeddings(tokenize("test tokenizer to see the tokens"))
test_emb.shape

The other input of the model is latent representation (pure noise). It will be transformed into a 512x512 image later.
The last input will be the timestep.

In [None]:
def generate_latents(seed):
    return torch.randn(
        (1, 4, 64, 64),
        generator=torch.manual_seed(seed),
    )

In [None]:
# Optional
test_latents = generate_latents(42)
latents.shape

Now we add two helpers to access and convert from torch to numpy with the proper datatype.

In [None]:
def get_scaled_sample(latents, t):
    return scheduler.scale_model_input(latents, t).numpy().astype(np.float32)


def get_timestep(t):
    return np.atleast_1d(t.numpy().astype(np.int64))  # convert 0D -> 1D

The UNet model will be run in a loop. It will predict the noise residual.

In [None]:
# Optional
unet.get_parameter_shapes()

In [None]:
def denoise(sample, embeddings, timestep):
    return np.array(
        unet.run({
            "sample": sample,
            "encoder_hidden_states": embeddings,
            "timestep": timestep
        })[0])

Helpers to do the classifier-free guidance and computing the previous noisy sample.

In [None]:
def perform_guidance(noise_pred_uncond, noise_pred_text, scale):
    return noise_pred_uncond + scale * (noise_pred_text - noise_pred_uncond)

def compute_previous(noise_pred, t, latents):
    # compute the previous noisy sample x_t -> x_t-1
    return scheduler.step(noise_pred, t, latents).prev_sample


Scale and decode the image latents with VAE.

In [None]:
def scale_denoised(latents):
    return 1 / 0.18215 * latents


def decode(latents):
    return np.array(
        vae.run({"latent_sample": latents.numpy().astype(np.float32)})[0])

And lastly, we need to convert it to an image to display or save.

In [None]:
def convert_to_rgb_image(image):
    image = np.clip(image / 2 + 0.5, 0, 1)
    image = np.transpose(image, (0, 2, 3, 1))
    images = (image * 255).round().astype("uint8")
    return Image.fromarray(images[0])

def save_image(pil_image, filename="output.png"):
    pil_image.save(filename, format="png")

Feel free to play around with these params.

In [None]:
prompt = "a photograph of an astronaut riding a horse"
negative_prompt = ""
steps = 20
seed = 13
scale = 7.0

And now, to put everything together and run the whole pipeline:

In [None]:
scheduler.set_timesteps(steps)

text_input, uncond_input = tokenize(prompt), tokenize(negative_prompt)
text_embeddings, uncond_embeddings = get_embeddings(
    text_input), get_embeddings(uncond_input)
latents = generate_latents(seed) * scheduler.init_noise_sigma

for t in tqdm(scheduler.timesteps):
    sample = get_scaled_sample(latents, t)
    timestep = get_timestep(t)

    noise_pred_uncond = denoise(sample, uncond_embeddings, timestep)
    noise_pred_text = denoise(sample, text_embeddings, timestep)

    noise_pred = perform_guidance(noise_pred_uncond, noise_pred_text, scale)
    latents = compute_previous(torch.from_numpy(noise_pred), t, latents)

latents = scale_denoised(latents)
result = decode(latents)
image = convert_to_rgb_image(result)

# show the image
image

If you like the generated image, save it with the following:

In [None]:
save_image(image, "output.png")