An important thing to keep in mind here is that a stable diffusion model is not a monolithic model but has different parts such as a UNet, VAE, text encoders etc. 

_Here (at least initially) we will only focus on and benchmark the UNet_ because its the most compute heavy. And this approach is more simpler than trying to export to ONNX, apply PTQ, QAT etc on all the components.

In [None]:
import os
import torch
from diffusers import StableDiffusionPipeline
import time
import statistics
import psutil
import pandas as pd 
from tinydiffusion.utils.logger import LoggerConfig #this works in VS Code because of the .env file
from tinydiffusion.utils.constants import PROMPT

In [None]:
LOGGER = LoggerConfig().logger

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
LOGGER.info(f"Using device: {device}")

Load a lightweight/distilled Stable Diffusion model (LoRA or small variant)

In [None]:
ROOT_DIR = os.path.dirname(os.getcwd())
LOGGER.info(f"Root directory: {ROOT_DIR}")

In [None]:
from tinydiffusion.utils.constants import ModelType

# Example: "stabilityai/stable-diffusion-2-base" is smaller than SD 1.5 full
model_cache_dir = os.path.join(ROOT_DIR, "checkpoints", "stablediffusion")
model_id = ModelType.STABLE_DIFFUSION_2_BASE.value  

Below we load the fp16 variant (as opposed to downloading the fp32 variant and then converting to fp16). [Ref](https://huggingface.co/docs/diffusers/en/using-diffusers/loading#:~:text=There%20are%20two%20important%20arguments%20for%20loading%20variants%3A)

In [None]:
# Load pipeline
if device == "cuda":
    pipe = StableDiffusionPipeline.from_pretrained(
        model_id,
        cache_dir=model_cache_dir,
        variant="fp16",        
        torch_dtype=torch.float16
    )
else:
    # for CPU use fp32 if available
    pipe = StableDiffusionPipeline.from_pretrained(
        model_id,
        cache_dir=model_cache_dir,
        torch_dtype=torch.float32 
    )
pipe = pipe.to(device)
pipe.unet.eval()

[StableDiffusionPipeline.enable_attention_slicing()](https://huggingface.co/docs/diffusers/v0.3.0/en/api/pipelines/stable_diffusion#diffusers.StableDiffusionPipeline.enable_attention_slicing)

In [None]:
# Enable memory-efficient attention for less VRAM usage
pipe.enable_attention_slicing()

Create inputs of UNet since as noted at the beginning of the notebook, we intend to benchmark just the UNet. So we need to explicitly pass the inputs to the UNet and get just the UNet ouput.

See [this](https://medium.com/@onkarmishra/stable-diffusion-explained-1f101284484d) for a quick reference about the stable diffusion architecture.

Instead of running text encoder → U-Net denoising loop → VAE decode we:

- Generate fake random latents (the "noisy" image at some timestep).
- Pick a timestep (e.g. 50).
- Encode text prompt via `pipe.text_encoder` ie, Stable Diffusion's text encoder (whatever it uses).
- Run just the U-Net forward pass.

This means we dont actually generate a final image.

In [None]:
batch_size = 1
height = width = 64
latents = torch.randn(
    (batch_size, pipe.unet.config.in_channels, height, width),
    device=device,
    dtype=pipe.unet.dtype
)
timestep = torch.tensor([10], device=device, dtype=torch.int64)  # arbitrary diffusion step
text_embeddings = pipe.text_encoder(
    pipe.tokenizer(PROMPT, return_tensors="pt").input_ids.to(device)
)[0]

LOGGER.info(f"Text embeddings shape: {text_embeddings.shape}")

Benchmarking

In [None]:
# Metrics
inference_time = []
cpu_mem_usage = []
gpu_mem_usage = []

In [None]:
prompt = PROMPT
num_samples = 10

process = psutil.Process(os.getpid())

#GEN_IMG_SAVE_PATH = os.path.join(os.path.dirname(os.getcwd()), "results", "generated_images")
#os.makedirs(GEN_IMG_SAVE_PATH, exist_ok=True)

#LOGGER.info(f"Generating images. Will be saved to: {GEN_IMG_SAVE_PATH}")

results = []

for i in range(num_samples):
    start_time = time.time()
    with torch.no_grad():
        #image = pipe(prompt, guidance_scale=7.5, num_inference_steps=50).images[0] # this is what we would typically do to generate the image
        noise_pred = pipe.unet(latents, timestep, text_embeddings).sample
    end_time = time.time()
    inference_time.append(end_time - start_time)

    # Memory usage - START
    cpu_mem = process.memory_info().rss / (1024**2)  # MB
    cpu_mem_usage.append(cpu_mem)

    if device == "cuda":
        gpu_mem = torch.cuda.memory_allocated(0) / (1024**2)  # MB
        gpu_mem_usage.append(gpu_mem)
    else:
        gpu_mem = 0 
    # Memory usage - END

    LOGGER.info(f"UNet Inference time: {(end_time - start_time):.2f}s")

LOGGER.info(f"\nAverage inference time: {statistics.mean(inference_time):.2f}s ± {statistics.stdev(inference_time):.2f}s")
LOGGER.info(f"\nAverage CPU memory usage: {statistics.mean(cpu_mem_usage):.2f}MB ± {statistics.stdev(cpu_mem_usage):.2f}MB")
if device == "cuda":
    LOGGER.info(f"\nAverage GPU memory usage: {statistics.mean(gpu_mem_usage):.2f}MB ± {statistics.stdev(gpu_mem_usage):.2f}MB")

# store results
results.append({
    "desc": "stable_diffusion_UNet_GPU",
    "avg_inference_time": statistics.mean(inference_time),
    "std_inference_time": statistics.stdev(inference_time),
    "avg_cpu_mem_usage": statistics.mean(cpu_mem_usage),
    "std_cpu_mem_usage": statistics.stdev(cpu_mem_usage),
    "avg_gpu_mem_usage": statistics.mean(gpu_mem_usage),
    "std_gpu_mem_usage": statistics.stdev(gpu_mem_usage),
})

The inference time would be really small here because we're running only one denoising step of the UNet as opposed to say 50 denoising steps. 

Save benchmark details as CSV

In [None]:
BENCHMARK_SAVE_PATH = os.path.join(os.path.dirname(os.getcwd()), "results", "benchmarks")
os.makedirs(BENCHMARK_SAVE_PATH, exist_ok=True)

In [None]:
df = pd.DataFrame(results)
csv_path = os.path.join(BENCHMARK_SAVE_PATH, "benchmark_results.csv")
df.to_csv(csv_path, index=False)
LOGGER.info(f"Saved benchmark results to {csv_path}")