https://zhuanlan.zhihu.com/p/621325215  
https://github.com/CompVis/stable-diffusion  
https://developer.aliyun.com/article/1238471  
https://github.com/huggingface/diffusers/tree/main  
https://zhuanlan.zhihu.com/p/617134893（原理+代码说明）  
https://jalammar.github.io/illustrated-stable-diffusion/（可视化sd）

需要安装以下软件：  
pip install transformers  
pip install einops  
pip install omegaconf   
下载一个预训练模型：

## 导入库或模块

In [1]:
import numpy as np
import torch
import torch.nn as nn
import PIL
from PIL import Image


## 定义读取、保存图片函数

In [2]:
#载入图片
def load_image(path):
    image = Image.open(path).convert("RGB")
    image = np.array(image).astype(np.float32) / 255.0   #(512, 512, 3)
    image = image[None].transpose(0, 3, 1, 2)           # (1, 3, 512, 512)
    image = torch.from_numpy(image)
    return 2.*image - 1.

#保存图片
def save_image(samples, path):     
    samples = 255 * (samples/2+0.5).clamp(0,1)    # (1, 3, 512, 512)
    samples = samples.detach().numpy()
    samples = samples.transpose(0, 2, 3, 1)       #(1, 512, 512, 3)
    image = samples[0]                            #(512, 512, 3)
    image = Image.fromarray(image.astype(np.uint8))
    image.save(path)

def test_load_and_save_img():
    img = load_image("girl.png")
    save_image(img, "girll01.png")

test_load_and_save_img()

## 导入VAE模型
load_vae为根据配置init_config去初始化模型，然后从预训练模型model.ckpt中读取参数（这是一个可以用于根据文本提示生成和修改图像的模型。它是一个潜在扩散模型（ Latent Diffusion Model），使用固定的、预训练的文本编码器（CLIP ViT-L/14）），预训练模型的first_stage_model即指代VAE模型
vae模型分为编码和解码两个部分，下图展示test_vae的功能，通过vae.encode将图像编码为隐特征，再通过vae.decode将特征还原为图像，运行程序后输出图像vae.png应与输入图像girl-horse.png相同
![image.png](attachment:image.png)

预训练模型的详细信息官网：  
https://huggingface.co/CompVis/stable-diffusion-v-1-4-original

In [3]:
from ldm.models.autoencoder import AutoencoderKL
#VAE模型
def load_vae():
    #初始化模型
    init_config = {
        "embed_dim": 4,
        "monitor": "val/rec_loss",
        "ddconfig":{
          "double_z": True,
          "z_channels": 4,
          "resolution": 256,
          "in_channels": 3,
          "out_ch": 3,
          "ch": 128,
          "ch_mult":[1,2,4,4],
          "num_res_blocks": 2,
          "attn_resolutions": [],
          "dropout": 0.0,
        },
        "lossconfig":{
          "target": "torch.nn.Identity"
        }
    }
    vae = AutoencoderKL(**init_config)
    #加载预训练参数
    pl_sd = torch.load("../data/sd-v1-4.ckpt", map_location="cpu")
    sd = pl_sd["state_dict"]
    model_dict = vae.state_dict()
    for k, v in model_dict.items():
        model_dict[k] = sd["first_stage_model."+k]
    vae.load_state_dict(model_dict, strict=False)

    vae.eval()
    return vae

#测试vae模型
def test_vae():
    vae = load_vae()
    img = load_image("girl-horse.png")  #(1,3,512,512)   
    latent = vae.encode(img).sample()       #(1,4,64,64)
    samples = vae.decode(latent)            #(1,3,512,512)
    save_image(samples,"vae.png")

In [4]:
test_vae()

making attention of type 'vanilla' with 512 in_channels
Working with z of shape (1, 4, 32, 32) = 4096 dimensions.
making attention of type 'vanilla' with 512 in_channels


模型生成的图像vae.png
![vae.png](attachment:vae.png)

## CLIP文本编码
下列代码展示CLIP文本编码的调用方法，先用CLIPTokenizer将提示词转换为token，再将token转换为(1, 77, 768)的编码
![image.png](attachment:image.png)

In [5]:
from transformers import CLIPTextModel, CLIPTokenizer
# 文本编码
def prompts_embedding(prompts):
    #加载编码模型
    tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-large-patch14")
    text_encoder = CLIPTextModel.from_pretrained("openai/clip-vit-large-patch14")

    #tokenizer.model_max_length -> 77
    text_input = tokenizer(prompts, padding="max_length", max_length=tokenizer.model_max_length, truncation=True, return_tensors="pt")

    text_embeddings = text_encoder(text_input.input_ids) 
    text_embeddings = text_embeddings[0]  #(1, 77, 768)

    return text_embeddings

def test_embedding():
    prompts = ["a photograph of an astronaut riding a horse"]
    text_embeddings = prompts_embedding(prompts)
    

    uncond_prompts = [""]
    uncond_embeddings = prompts_embedding(uncond_prompts)

    print("text_embeddings.shape",text_embeddings.shape)
    print("text_embeddings.shape",uncond_embeddings.shape)

In [6]:
test_embedding()

Some weights of the model checkpoint at openai/clip-vit-large-patch14 were not used when initializing CLIPTextModel: ['vision_model.encoder.layers.19.layer_norm2.bias', 'vision_model.encoder.layers.9.layer_norm2.bias', 'vision_model.encoder.layers.9.self_attn.k_proj.weight', 'vision_model.encoder.layers.10.mlp.fc2.bias', 'vision_model.encoder.layers.19.self_attn.v_proj.weight', 'vision_model.encoder.layers.8.self_attn.q_proj.weight', 'vision_model.encoder.layers.16.mlp.fc1.weight', 'vision_model.encoder.layers.21.mlp.fc1.weight', 'vision_model.encoder.layers.12.mlp.fc2.weight', 'vision_model.encoder.layers.11.self_attn.q_proj.weight', 'vision_model.encoder.layers.15.self_attn.k_proj.weight', 'vision_model.encoder.layers.11.mlp.fc2.weight', 'vision_model.encoder.layers.17.mlp.fc1.weight', 'vision_model.encoder.layers.2.self_attn.out_proj.bias', 'vision_model.encoder.layers.3.mlp.fc1.weight', 'vision_model.encoder.layers.5.self_attn.v_proj.bias', 'vision_model.encoder.layers.3.mlp.fc2.we

Some weights of the model checkpoint at openai/clip-vit-large-patch14 were not used when initializing CLIPTextModel: ['vision_model.encoder.layers.19.layer_norm2.bias', 'vision_model.encoder.layers.9.layer_norm2.bias', 'vision_model.encoder.layers.9.self_attn.k_proj.weight', 'vision_model.encoder.layers.10.mlp.fc2.bias', 'vision_model.encoder.layers.19.self_attn.v_proj.weight', 'vision_model.encoder.layers.8.self_attn.q_proj.weight', 'vision_model.encoder.layers.16.mlp.fc1.weight', 'vision_model.encoder.layers.21.mlp.fc1.weight', 'vision_model.encoder.layers.12.mlp.fc2.weight', 'vision_model.encoder.layers.11.self_attn.q_proj.weight', 'vision_model.encoder.layers.15.self_attn.k_proj.weight', 'vision_model.encoder.layers.11.mlp.fc2.weight', 'vision_model.encoder.layers.17.mlp.fc1.weight', 'vision_model.encoder.layers.2.self_attn.out_proj.bias', 'vision_model.encoder.layers.3.mlp.fc1.weight', 'vision_model.encoder.layers.5.self_attn.v_proj.bias', 'vision_model.encoder.layers.3.mlp.fc2.we

text_embeddings.shape torch.Size([1, 77, 768])
text_embeddings.shape torch.Size([1, 77, 768])


测试代码test_embedding中，编码了"a photograph of an astronaut riding a horse"和“”（空字符串）两个文本。此处仅做代码展示，因此模型加载了两次，若实际使用，可以仅加载一次模型。代码中间tokenizer的输出text_input如下所示，是一个包含input_ids和attention_mask两个元素的值，其中input_ids既是每个token的id。参数tokenizer.model_max_length默认为77，即一句话中最大的token个数，input_ids的长度也为77，49407为填充字符，代表空的token。

## UNet模型
UNet是stable-diffusion一个核心构件，如下图所示。
![image.png](attachment:image.png)  

加载UNet的代码如下所示，先根据配置unet_init_config构建模型，然后从预训练模型model.ckpt的“state_dict.model.diffusion_model”中获取参数


In [7]:
from ldm.modules.diffusionmodules.openaimodel import UNetModel

#加载unet模型
def load_unet():
    unet_init_config = {
            "image_size": 32, # unused
            "in_channels": 4,
            "out_channels": 4,
            "model_channels": 320,
            "attention_resolutions": [ 4, 2, 1 ],
            "num_res_blocks": 2,
            "channel_mult": [ 1, 2, 4, 4 ],
            "num_heads": 8,
            "use_spatial_transformer": True,
            "transformer_depth": 1,
            "context_dim": 768,
            "use_checkpoint": True,
            "legacy": False,
    }
    unet = UNetModel(**unet_init_config)
    pl_sd = torch.load("../data/sd-v1-4.ckpt", map_location="cpu")
    sd = pl_sd["state_dict"]

    model_dict = unet.state_dict()
    for k, v in model_dict.items():
        model_dict[k] = sd["model.diffusion_model."+k]

    unet.load_state_dict(model_dict, strict=False)
    unet.cuda()
    unet.eval()
    return unet

def test_unet():
    #vae
    latent = torch.randn(1,4,64,64).cuda()
    #text
    text_embeddings =torch.randn(1, 77, 768).cuda()
    #timestamp
    timestamp = torch.tensor([0]).cuda()
    unet = load_unet()
    y = unet(latent.cuda(), timestamp.cuda(), text_embeddings.cuda())
    print(y.shape) #(1, 4, 64, 64)

In [8]:
test_unet()

torch.Size([1, 4, 64, 64])


在test_unet中，可以看到UNet有三个输入，一个是隐特征latent，大小为（N，4，64，64）；一个是timestamp，代表当前步数，大小为（N，1）；另一个是文本编码，大小为（N, 77, 768）。而UNet的输出为（N，4，64，64）的隐特征。

## 调度器
下图展示stable-diffusion文生图的整个过程，会经过多个UNet的推理步骤，而每个步骤会有不同参数。我们编写一个“调度器”的类，来处理每个步骤的计算。
![image.png](attachment:image.png)
以下代码实现的是lms discrete调度器（Linear multistep scheduler for discrete beta schedules）

In [9]:
from scipy import integrate

#参考https://github.com/huggingface/diffusers/blob/main/src/diffusers/schedulers/scheduling_lms_discrete.py
class lms_scheduler():
    def __init__(self):
        beta_start = 0.00085
        beta_end = 0.012
        num_train_timesteps = 1000

        #betas = [9.99999975e-05 1.19919918e-04 1.39839845e-04 1.59759758e-04 ...
        self.betas = np.linspace(beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=np.float32) ** 2   
        #alphas = #[0.9999     0.9998801  0.99986017 ...
        self.alphas = 1.0 - self.betas   
        # alphas_cumprod=累积乘积 [9.99899983e-01 9.99780059e-01 9.99640286e-01 9.99480605e-01 ...
        self.alphas_cumprod = np.cumprod(self.alphas, axis=0) 
        return

    def set_timesteps(self, num_inference_steps=100):
        self.num_inference_steps = num_inference_steps
        #1000：num_train_timesteps
        self.timesteps = np.linspace(1000 - 1, 0, num_inference_steps, dtype=float)  #[999.         988.90909091 978.81818182 968.72727273 958.63636364 …… ] 100个
        low_idx = np.floor(self.timesteps).astype(int) #[999 988 978 968 958  ...] 100个
        high_idx = np.ceil(self.timesteps).astype(int) #[999 989 979 969 959  ...]  100个
        frac = np.mod(self.timesteps, 1.0)             #[0.         0.90909091 0.81818182 0.72727273 ... ] 小数部分

        sigmas = np.array(((1 - self.alphas_cumprod) / self.alphas_cumprod) ** 0.5)  #[1.00013297e-02 1.48320440e-02  1000个
        sigmas = (1 - frac) * sigmas[low_idx] + frac * sigmas[high_idx]  #[1.57407227e+02 1.42219348e+02   100个
        self.sigmas = np.concatenate([sigmas, [0.0]]).astype(np.float32) #最后加个零 101个
        self.derivatives = []

    def get_lms_coefficient(self, order, t, current_order):
        def lms_derivative(tau):
            prod = 1.0
            for k in range(order):
                if current_order == k:
                    continue
                prod *= (tau - self.sigmas[t - k]) / (self.sigmas[t - current_order] - self.sigmas[t - k])
            return prod

        integrated_coeff = integrate.quad(lms_derivative, self.sigmas[t], self.sigmas[t + 1], epsrel=1e-4)[0]

        return integrated_coeff

    def step(self,model_output,timestep,sample):
        order = 4
        sigma = self.sigmas[timestep]
        pred_original_sample = sample - sigma * model_output
        derivative = (sample - pred_original_sample) / sigma
        self.derivatives.append(derivative)
        if len(self.derivatives) > order:
            self.derivatives.pop(0)
        order = min(timestep + 1, order)
        lms_coeffs = [self.get_lms_coefficient(order, timestep, curr_order) for curr_order in range(order)]    
        prev_sample = sample + sum(coeff * derivative for coeff, derivative in zip(lms_coeffs, reversed(self.derivatives)))
        return prev_sample

## 文生图
有了上面各个组件作为基础，便可以将它们组装起来，实现stable-diffusion文生图和图生图的功能。以下是文生图的实现。其实就是各组件的组合。为了程序的简洁，此处有一些可以优化性能的地方，例如调用了两次unet，实际可以组合一起作为一个批次去调用；程序中也可以看到guidance_scale的实现，它是将文本条件为输入的隐特征和空输入的隐特征做混合。  
guidance_scale是一个CFG指数，CFG指数（Classifier-free guidance，guidance_scale，CFG）是一个控制文本提示对扩散过程的影响程度的值。简单来说就是在加噪阶段将条件控制下预测的噪音和无条件下的预测噪音组合在一起来确定最终的噪声。通常guidance_scale可以选7-8.5之间，如果使用非常大的值，图像可能看起来不错，但多样性会降低。

![image.png](attachment:image.png)
其中w代表CFG，当w越大时，condition起的作用越大，即生成的图像更和输入文本一致，当w被设置为0 00时，图像生成是无条件的，文本提示会被忽略。

In [20]:
def txt2img():
    #unet
    unet = load_unet()
    #调度器
    scheduler = lms_scheduler()
    scheduler.set_timesteps(100)
    #文本编码
    #prompts = ["a photograph of an astronaut riding a horse"]
    #prompts = ["a photograph of a girl riding a horse"]
    prompts = ["paradise consmic beach"]
    text_embeddings = prompts_embedding(prompts)
    text_embeddings = text_embeddings.cuda()     #(1, 77, 768)
    uncond_prompts = [""]
    uncond_embeddings = prompts_embedding(uncond_prompts)
    uncond_embeddings = uncond_embeddings.cuda() #(1, 77, 768)
    #初始隐变量
    latents = torch.randn( (1, 4, 64, 64))  #(1, 4, 64, 64)
    latents = latents * scheduler.sigmas[0]    #sigmas[0]=157.40723
    latents = latents.cuda()
    #循环步骤
    for i, t in enumerate(scheduler.timesteps):  #timesteps=[999.  988.90909091 978.81818182 ...100个
        latent_model_input = latents  #(1, 4, 64, 64)  
        sigma = scheduler.sigmas[i]
        latent_model_input = latent_model_input / ((sigma**2 + 1) ** 0.5)
        timestamp = torch.tensor([t]).cuda()
        
        #使用有条件和无条件组合方式，有利于提升生成图像质量（这个一个经验值）
        with torch.no_grad():  #参数guidance_scale越大时，生成的图像应该会和输入文本更一致
            noise_pred_text = unet(latent_model_input, timestamp, text_embeddings)
            noise_pred_uncond = unet(latent_model_input, timestamp, uncond_embeddings)
            guidance_scale = 7.5 
            noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

            latents = scheduler.step(noise_pred, i, latents)
        
    vae = load_vae()
    latents = 1 / 0.18215 * latents
    image = vae.decode(latents.cpu())  #(1, 3, 512, 512)
    save_image(image,"txt2img.png")

In [15]:
txt2img()

Some weights of the model checkpoint at openai/clip-vit-large-patch14 were not used when initializing CLIPTextModel: ['vision_model.encoder.layers.19.layer_norm2.bias', 'vision_model.encoder.layers.9.layer_norm2.bias', 'vision_model.encoder.layers.9.self_attn.k_proj.weight', 'vision_model.encoder.layers.10.mlp.fc2.bias', 'vision_model.encoder.layers.19.self_attn.v_proj.weight', 'vision_model.encoder.layers.8.self_attn.q_proj.weight', 'vision_model.encoder.layers.16.mlp.fc1.weight', 'vision_model.encoder.layers.21.mlp.fc1.weight', 'vision_model.encoder.layers.12.mlp.fc2.weight', 'vision_model.encoder.layers.11.self_attn.q_proj.weight', 'vision_model.encoder.layers.15.self_attn.k_proj.weight', 'vision_model.encoder.layers.11.mlp.fc2.weight', 'vision_model.encoder.layers.17.mlp.fc1.weight', 'vision_model.encoder.layers.2.self_attn.out_proj.bias', 'vision_model.encoder.layers.3.mlp.fc1.weight', 'vision_model.encoder.layers.5.self_attn.v_proj.bias', 'vision_model.encoder.layers.3.mlp.fc2.we

Some weights of the model checkpoint at openai/clip-vit-large-patch14 were not used when initializing CLIPTextModel: ['vision_model.encoder.layers.19.layer_norm2.bias', 'vision_model.encoder.layers.9.layer_norm2.bias', 'vision_model.encoder.layers.9.self_attn.k_proj.weight', 'vision_model.encoder.layers.10.mlp.fc2.bias', 'vision_model.encoder.layers.19.self_attn.v_proj.weight', 'vision_model.encoder.layers.8.self_attn.q_proj.weight', 'vision_model.encoder.layers.16.mlp.fc1.weight', 'vision_model.encoder.layers.21.mlp.fc1.weight', 'vision_model.encoder.layers.12.mlp.fc2.weight', 'vision_model.encoder.layers.11.self_attn.q_proj.weight', 'vision_model.encoder.layers.15.self_attn.k_proj.weight', 'vision_model.encoder.layers.11.mlp.fc2.weight', 'vision_model.encoder.layers.17.mlp.fc1.weight', 'vision_model.encoder.layers.2.self_attn.out_proj.bias', 'vision_model.encoder.layers.3.mlp.fc1.weight', 'vision_model.encoder.layers.5.self_attn.v_proj.bias', 'vision_model.encoder.layers.3.mlp.fc2.we

making attention of type 'vanilla' with 512 in_channels
Working with z of shape (1, 4, 32, 32) = 4096 dimensions.
making attention of type 'vanilla' with 512 in_channels


运行程序可以得到文本“a photograph of an astronaut riding a horse”生成的图片，实现文生图的效果。
![txt2img-2.png](attachment:txt2img-2.png)

a photograph of a girl riding a horse生成的图片，实现文生图的效果
![txt2img.png](attachment:txt2img.png)

paradise consmic beach
![txt2img.png](attachment:txt2img.png)

![txt2img.png](attachment:txt2img.png)

## 图生图
图生图的过程与文生图相似，不同的是，图生图以输入图像的隐特征作为初始步骤的隐特征，如下图所示。
![image.png](attachment:image.png)
它与文生图的结构一致，而循环的起点不再从0开始，改为由“强度”START_STRENGTH来调配，以便控制输入图像所占的比重。

In [18]:
def img2img():
    #unet
    unet = load_unet().cuda()
    #调度器
    scheduler = lms_scheduler()
    scheduler.set_timesteps(100)
    #文本编码
    prompts = ["a seagull flying"]
    text_embeddings = prompts_embedding(prompts)
    text_embeddings = text_embeddings.cuda()     #(1, 77, 768)
    uncond_prompts = [""]
    uncond_embeddings = prompts_embedding(uncond_prompts)
    uncond_embeddings = uncond_embeddings.cuda() #(1, 77, 768)
    #VAE
    vae = load_vae()
    init_img = load_image("beach.png")
    init_latent = vae.encode(init_img).sample().cuda()*0.18215
    #初始隐变量
    noise_latents = torch.randn( (1, 4, 64, 64),device="cuda")
    START_STRENGTH = 45
    print("xxxx init_latent ",init_latent.shape)
    print("xxxx noise_latents ",noise_latents.shape)
    latents = init_latent + noise_latents*scheduler.sigmas[START_STRENGTH]
    #循环步骤

    for i, t in enumerate(scheduler.timesteps):  #[999.  988.90909091 978.81818182 ...100个
        print(i,t)
        if i < START_STRENGTH:
            continue
        latent_model_input = latents  #torch.Size([1, 4, 64, 64])  
        sigma = scheduler.sigmas[i]
        latent_model_input = latent_model_input / ((sigma**2 + 1) ** 0.5)
        
        timestamp = torch.tensor([t])


        with torch.no_grad(): 
            noise_pred_text = unet(latent_model_input.cuda(), timestamp.cuda(), text_embeddings.cuda())
            noise_pred_uncond = unet(latent_model_input.cuda(), timestamp.cuda(), uncond_embeddings.cuda())
            guidance_scale = 7.5 
            noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
            latents = scheduler.step(noise_pred, i, latents)
        

    latents = 1 / 0.18215 * latents
    image = vae.decode(latents.cpu())
    save_image(image,"img2img.png")




In [19]:
img2img()

Some weights of the model checkpoint at openai/clip-vit-large-patch14 were not used when initializing CLIPTextModel: ['vision_model.encoder.layers.19.layer_norm2.bias', 'vision_model.encoder.layers.9.layer_norm2.bias', 'vision_model.encoder.layers.9.self_attn.k_proj.weight', 'vision_model.encoder.layers.10.mlp.fc2.bias', 'vision_model.encoder.layers.19.self_attn.v_proj.weight', 'vision_model.encoder.layers.8.self_attn.q_proj.weight', 'vision_model.encoder.layers.16.mlp.fc1.weight', 'vision_model.encoder.layers.21.mlp.fc1.weight', 'vision_model.encoder.layers.12.mlp.fc2.weight', 'vision_model.encoder.layers.11.self_attn.q_proj.weight', 'vision_model.encoder.layers.15.self_attn.k_proj.weight', 'vision_model.encoder.layers.11.mlp.fc2.weight', 'vision_model.encoder.layers.17.mlp.fc1.weight', 'vision_model.encoder.layers.2.self_attn.out_proj.bias', 'vision_model.encoder.layers.3.mlp.fc1.weight', 'vision_model.encoder.layers.5.self_attn.v_proj.bias', 'vision_model.encoder.layers.3.mlp.fc2.we

Some weights of the model checkpoint at openai/clip-vit-large-patch14 were not used when initializing CLIPTextModel: ['vision_model.encoder.layers.19.layer_norm2.bias', 'vision_model.encoder.layers.9.layer_norm2.bias', 'vision_model.encoder.layers.9.self_attn.k_proj.weight', 'vision_model.encoder.layers.10.mlp.fc2.bias', 'vision_model.encoder.layers.19.self_attn.v_proj.weight', 'vision_model.encoder.layers.8.self_attn.q_proj.weight', 'vision_model.encoder.layers.16.mlp.fc1.weight', 'vision_model.encoder.layers.21.mlp.fc1.weight', 'vision_model.encoder.layers.12.mlp.fc2.weight', 'vision_model.encoder.layers.11.self_attn.q_proj.weight', 'vision_model.encoder.layers.15.self_attn.k_proj.weight', 'vision_model.encoder.layers.11.mlp.fc2.weight', 'vision_model.encoder.layers.17.mlp.fc1.weight', 'vision_model.encoder.layers.2.self_attn.out_proj.bias', 'vision_model.encoder.layers.3.mlp.fc1.weight', 'vision_model.encoder.layers.5.self_attn.v_proj.bias', 'vision_model.encoder.layers.3.mlp.fc2.we

making attention of type 'vanilla' with 512 in_channels
Working with z of shape (1, 4, 32, 32) = 4096 dimensions.
making attention of type 'vanilla' with 512 in_channels
xxxx init_latent  torch.Size([1, 4, 64, 64])
xxxx noise_latents  torch.Size([1, 4, 64, 64])
0 999.0
1 988.9090909090909
2 978.8181818181819
3 968.7272727272727
4 958.6363636363636
5 948.5454545454545
6 938.4545454545455
7 928.3636363636364
8 918.2727272727273
9 908.1818181818181
10 898.0909090909091
11 888.0
12 877.9090909090909
13 867.8181818181818
14 857.7272727272727
15 847.6363636363636
16 837.5454545454545
17 827.4545454545455
18 817.3636363636364
19 807.2727272727273
20 797.1818181818181
21 787.090909090909
22 777.0
23 766.9090909090909
24 756.8181818181818
25 746.7272727272727
26 736.6363636363636
27 726.5454545454545
28 716.4545454545455
29 706.3636363636363
30 696.2727272727273
31 686.1818181818181
32 676.090909090909
33 666.0
34 655.9090909090909
35 645.8181818181818
36 635.7272727272727
37 625.6363636363636


![img2img-2.png](attachment:img2img-2.png)

In [9]:
from ldm.models.autoencoder import AutoencoderKL
#VAE模型
def load_vae():
    #初始化模型
    init_config = {
        "embed_dim": 4,
        "monitor": "val/rec_loss",
        "ddconfig":{
          "double_z": True,
          "z_channels": 4,
          "resolution": 256,
          "in_channels": 3,
          "out_ch": 3,
          "ch": 128,
          "ch_mult":[1,2,4,4],
          "num_res_blocks": 2,
          "attn_resolutions": [],
          "dropout": 0.0,
        },
        "lossconfig":{
          "target": "torch.nn.Identity"
        }
    }
    vae = AutoencoderKL(**init_config)
    #加载预训练参数
    pl_sd = torch.load("model.ckpt", map_location="cpu")
    sd = pl_sd["state_dict"]
    model_dict = vae.state_dict()
    for k, v in model_dict.items():
        model_dict[k] = sd["first_stage_model."+k]
    vae.load_state_dict(model_dict, strict=False)

    vae.eval()
    return vae

#测试vae模型
def test_vae():
    vae = load_vae()
    img = load_image("girl_and_horse.png")  #(1,3,512,512)   
    latent = vae.encode(img).sample()       #(1,4,64,64)
    samples = vae.decode(latent)            #(1,3,512,512)
    save_image(samples,"vae.png")

#test_vae()


from transformers import CLIPTextModel, CLIPTokenizer
# 文本编码
def prompts_embedding(prompts):
    #加载编码模型
    tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-large-patch14")
    text_encoder = CLIPTextModel.from_pretrained("openai/clip-vit-large-patch14")

    #tokenizer.model_max_length -> 77
    text_input = tokenizer(prompts, padding="max_length", max_length=tokenizer.model_max_length, truncation=True, return_tensors="pt")

    text_embeddings = text_encoder(text_input.input_ids) 
    text_embeddings = text_embeddings[0]  #(1, 77, 768)

    return text_embeddings

def test_embedding():
    prompts = ["a photograph of an astronaut riding a horse"]
    text_embeddings = prompts_embedding(prompts)
    

    uncond_prompts = [""]
    uncond_embeddings = prompts_embedding(uncond_prompts)

    print("text_embeddings.shape",text_embeddings.shape)
    print("text_embeddings.shape",uncond_embeddings.shape)
    

#test_embedding()


from ldm.modules.diffusionmodules.openaimodel import UNetModel

#加载unet模型
def load_unet():
    unet_init_config = {
            "image_size": 32, # unused
            "in_channels": 4,
            "out_channels": 4,
            "model_channels": 320,
            "attention_resolutions": [ 4, 2, 1 ],
            "num_res_blocks": 2,
            "channel_mult": [ 1, 2, 4, 4 ],
            "num_heads": 8,
            "use_spatial_transformer": True,
            "transformer_depth": 1,
            "context_dim": 768,
            "use_checkpoint": True,
            "legacy": False,
    }
    unet = UNetModel(**unet_init_config)
    pl_sd = torch.load("model.ckpt", map_location="cpu")
    sd = pl_sd["state_dict"]

    model_dict = unet.state_dict()
    for k, v in model_dict.items():
        model_dict[k] = sd["model.diffusion_model."+k]

    unet.load_state_dict(model_dict, strict=False)
    unet.cuda()
    unet.eval()
    return unet

def test_unet():
    #vae
    latent = torch.randn(1,4,64,64).cuda()
    #text
    text_embeddings =torch.randn(1, 77, 768).cuda()
    #timestamp
    timestamp = torch.tensor([0]).cuda()
    unet = load_unet()
    y = unet(latent.cuda(), timestamp.cuda(), text_embeddings.cuda())
    print(y.shape) #(1, 4, 64, 64)


from scipy import integrate

#参考https://github.com/huggingface/diffusers/blob/main/src/diffusers/schedulers/scheduling_lms_discrete.py
class lms_scheduler():
    def __init__(self):
        beta_start = 0.00085
        beta_end = 0.012
        num_train_timesteps = 1000

        #betas = [9.99999975e-05 1.19919918e-04 1.39839845e-04 1.59759758e-04 ...
        self.betas = np.linspace(beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=np.float32) ** 2   
        #alphas = #[0.9999     0.9998801  0.99986017 ...
        self.alphas = 1.0 - self.betas   
        # alphas_cumprod=累积乘积 [9.99899983e-01 9.99780059e-01 9.99640286e-01 9.99480605e-01 ...
        self.alphas_cumprod = np.cumprod(self.alphas, axis=0) 
        return

    def set_timesteps(self, num_inference_steps=100):
        self.num_inference_steps = num_inference_steps
        #1000：num_train_timesteps
        self.timesteps = np.linspace(1000 - 1, 0, num_inference_steps, dtype=float)  #[999.         988.90909091 978.81818182 968.72727273 958.63636364 …… ] 100个
        low_idx = np.floor(self.timesteps).astype(int) #[999 988 978 968 958  ...] 100个
        high_idx = np.ceil(self.timesteps).astype(int) #[999 989 979 969 959  ...]  100个
        frac = np.mod(self.timesteps, 1.0)             #[0.         0.90909091 0.81818182 0.72727273 ... ] 小数部分

        sigmas = np.array(((1 - self.alphas_cumprod) / self.alphas_cumprod) ** 0.5)  #[1.00013297e-02 1.48320440e-02  1000个
        sigmas = (1 - frac) * sigmas[low_idx] + frac * sigmas[high_idx]  #[1.57407227e+02 1.42219348e+02   100个
        self.sigmas = np.concatenate([sigmas, [0.0]]).astype(np.float32) #最后加个零 101个
        self.derivatives = []

    def get_lms_coefficient(self, order, t, current_order):
        def lms_derivative(tau):
            prod = 1.0
            for k in range(order):
                if current_order == k:
                    continue
                prod *= (tau - self.sigmas[t - k]) / (self.sigmas[t - current_order] - self.sigmas[t - k])
            return prod

        integrated_coeff = integrate.quad(lms_derivative, self.sigmas[t], self.sigmas[t + 1], epsrel=1e-4)[0]

        return integrated_coeff

    def step(self,model_output,timestep,sample):
        order = 4
        sigma = self.sigmas[timestep]
        pred_original_sample = sample - sigma * model_output
        derivative = (sample - pred_original_sample) / sigma
        self.derivatives.append(derivative)
        if len(self.derivatives) > order:
            self.derivatives.pop(0)
        order = min(timestep + 1, order)
        lms_coeffs = [self.get_lms_coefficient(order, timestep, curr_order) for curr_order in range(order)]    
        prev_sample = sample + sum(coeff * derivative for coeff, derivative in zip(lms_coeffs, reversed(self.derivatives)))
        return prev_sample
#test_unet()


def txt2img():
    #unet
    unet = load_unet()
    #调度器
    scheduler = lms_scheduler()
    scheduler.set_timesteps(100)
    #文本编码
    prompts = ["a photograph of an astronaut riding a horse"]
    text_embeddings = prompts_embedding(prompts)
    text_embeddings = text_embeddings.cuda()     #(1, 77, 768)
    uncond_prompts = [""]
    uncond_embeddings = prompts_embedding(uncond_prompts)
    uncond_embeddings = uncond_embeddings.cuda() #(1, 77, 768)
    #初始隐变量
    latents = torch.randn( (1, 4, 64, 64))  #(1, 4, 64, 64)
    latents = latents * scheduler.sigmas[0]    #sigmas[0]=157.40723
    latents = latents.cuda()
    #循环步骤
    for i, t in enumerate(scheduler.timesteps):  #timesteps=[999.  988.90909091 978.81818182 ...100个
        latent_model_input = latents  #(1, 4, 64, 64)  
        sigma = scheduler.sigmas[i]
        latent_model_input = latent_model_input / ((sigma**2 + 1) ** 0.5)
        timestamp = torch.tensor([t]).cuda()

        with torch.no_grad():  #节省显存，所以分开两次UNet推理
            noise_pred_text = unet(latent_model_input, timestamp, text_embeddings)
            noise_pred_uncond = unet(latent_model_input, timestamp, uncond_embeddings)
            guidance_scale = 7.5 
            noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

            latents = scheduler.step(noise_pred, i, latents)
        
    vae = load_vae()
    latents = 1 / 0.18215 * latents
    image = vae.decode(latents.cpu())  #(1, 3, 512, 512)
    save_image(image,"txt2img.png")

#txt2img()


def img2img():
    #unet
    unet = load_unet().cuda()
    #调度器
    scheduler = lms_scheduler()
    scheduler.set_timesteps(100)
    #文本编码
    prompts = ["a girl watch the sky"]
    text_embeddings = prompts_embedding(prompts)
    text_embeddings = text_embeddings.cuda()     #(1, 77, 768)
    uncond_prompts = [""]
    uncond_embeddings = prompts_embedding(uncond_prompts)
    uncond_embeddings = uncond_embeddings.cuda() #(1, 77, 768)
    #VAE
    vae = load_vae()
    init_img = load_image("girl2.jpg")
    init_latent = vae.encode(init_img).sample().cuda()*0.18215
    #初始隐变量
    noise_latents = torch.randn( (1, 4, 64, 64),device="cuda")
    START_STRENGTH = 45
    print("xxxx init_latent ",init_latent.shape)
    print("xxxx noise_latents ",noise_latents.shape)
    latents = init_latent + noise_latents*scheduler.sigmas[START_STRENGTH]
    #循环步骤

    for i, t in enumerate(scheduler.timesteps):  #[999.  988.90909091 978.81818182 ...100个
        print(i,t)
        if i < START_STRENGTH:
            continue
        latent_model_input = latents  #torch.Size([1, 4, 64, 64])  
        sigma = scheduler.sigmas[i]
        latent_model_input = latent_model_input / ((sigma**2 + 1) ** 0.5)
        
        timestamp = torch.tensor([t])


        with torch.no_grad(): 
            noise_pred_text = unet(latent_model_input.cuda(), timestamp.cuda(), text_embeddings.cuda())
            noise_pred_uncond = unet(latent_model_input.cuda(), timestamp.cuda(), uncond_embeddings.cuda())
            guidance_scale = 7.5 
            noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
            latents = scheduler.step(noise_pred, i, latents)
        

    latents = 1 / 0.18215 * latents
    image = vae.decode(latents.cpu())
    save_image(image,"img2img.png")


#img2img()

In [6]:
class VectorQuantizer2(nn.Module):
    """
    Improved version over VectorQuantizer, can be used as a drop-in replacement. Mostly
    avoids costly matrix multiplications and allows for post-hoc remapping of indices.
    """
    # NOTE: due to a bug the beta term was applied to the wrong term. for
    # backwards compatibility we use the buggy version by default, but you can
    # specify legacy=False to fix it.
    def __init__(self, n_e, e_dim, beta, remap=None, unknown_index="random",
                 sane_index_shape=False, legacy=True):
        super().__init__()
        self.n_e = n_e
        self.e_dim = e_dim
        self.beta = beta
        self.legacy = legacy

        self.embedding = nn.Embedding(self.n_e, self.e_dim)
        self.embedding.weight.data.uniform_(-1.0 / self.n_e, 1.0 / self.n_e)

        self.remap = remap
        if self.remap is not None:
            self.register_buffer("used", torch.tensor(np.load(self.remap)))
            self.re_embed = self.used.shape[0]
            self.unknown_index = unknown_index # "random" or "extra" or integer
            if self.unknown_index == "extra":
                self.unknown_index = self.re_embed
                self.re_embed = self.re_embed+1
            print(f"Remapping {self.n_e} indices to {self.re_embed} indices. "
                  f"Using {self.unknown_index} for unknown indices.")
        else:
            self.re_embed = n_e

        self.sane_index_shape = sane_index_shape

    def remap_to_used(self, inds):
        ishape = inds.shape
        assert len(ishape)>1
        inds = inds.reshape(ishape[0],-1)
        used = self.used.to(inds)
        match = (inds[:,:,None]==used[None,None,...]).long()
        new = match.argmax(-1)
        unknown = match.sum(2)<1
        if self.unknown_index == "random":
            new[unknown]=torch.randint(0,self.re_embed,size=new[unknown].shape).to(device=new.device)
        else:
            new[unknown] = self.unknown_index
        return new.reshape(ishape)

    def unmap_to_all(self, inds):
        ishape = inds.shape
        assert len(ishape)>1
        inds = inds.reshape(ishape[0],-1)
        used = self.used.to(inds)
        if self.re_embed > self.used.shape[0]: # extra token
            inds[inds>=self.used.shape[0]] = 0 # simply set to zero
        back=torch.gather(used[None,:][inds.shape[0]*[0],:], 1, inds)
        return back.reshape(ishape)

    def forward(self, z, temp=None, rescale_logits=False, return_logits=False):
        assert temp is None or temp==1.0, "Only for interface compatible with Gumbel"
        assert rescale_logits==False, "Only for interface compatible with Gumbel"
        assert return_logits==False, "Only for interface compatible with Gumbel"
        # reshape z -> (batch, height, width, channel) and flatten
        z = rearrange(z, 'b c h w -> b h w c').contiguous()
        z_flattened = z.view(-1, self.e_dim)
        # distances from z to embeddings e_j (z - e)^2 = z^2 + e^2 - 2 e * z

        d = torch.sum(z_flattened ** 2, dim=1, keepdim=True) + \
            torch.sum(self.embedding.weight**2, dim=1) - 2 * \
            torch.einsum('bd,dn->bn', z_flattened, rearrange(self.embedding.weight, 'n d -> d n'))

        min_encoding_indices = torch.argmin(d, dim=1)
        z_q = self.embedding(min_encoding_indices).view(z.shape)
        perplexity = None
        min_encodings = None

        # compute loss for embedding
        if not self.legacy:
            loss = self.beta * torch.mean((z_q.detach()-z)**2) + \
                   torch.mean((z_q - z.detach()) ** 2)
        else:
            loss = torch.mean((z_q.detach()-z)**2) + self.beta * \
                   torch.mean((z_q - z.detach()) ** 2)

        # preserve gradients
        z_q = z + (z_q - z).detach()

        # reshape back to match original input shape
        z_q = rearrange(z_q, 'b h w c -> b c h w').contiguous()

        if self.remap is not None:
            min_encoding_indices = min_encoding_indices.reshape(z.shape[0],-1) # add batch axis
            min_encoding_indices = self.remap_to_used(min_encoding_indices)
            min_encoding_indices = min_encoding_indices.reshape(-1,1) # flatten

        if self.sane_index_shape:
            min_encoding_indices = min_encoding_indices.reshape(
                z_q.shape[0], z_q.shape[2], z_q.shape[3])

        return z_q, loss, (perplexity, min_encodings, min_encoding_indices)

    def get_codebook_entry(self, indices, shape):
        # shape specifying (batch, height, width, channel)
        if self.remap is not None:
            indices = indices.reshape(shape[0],-1) # add batch axis
            indices = self.unmap_to_all(indices)
            indices = indices.reshape(-1) # flatten again

        # get quantized latent vectors
        z_q = self.embedding(indices)

        if shape is not None:
            z_q = z_q.view(shape)
            # reshape back to match original input shape
            z_q = z_q.permute(0, 3, 1, 2).contiguous()

        return z_q


Denoising Diffusion Probabilistic Models (DDPM) 的正向过程是指给定一个初始图像样本，通过一系列的扩散操作来生成最终的噪声图像。在这个过程中，条件概率是通过对数似然函数得到的。

具体来说，设 $x_t$ 表示 DDPM 在时间步 $t$ 时的图像状态，$\theta$ 表示 DDPM 使用的神经网络参数。那么，我们可以通过以下方式定义对数似然函数：

$$\log p(x_T | x_0, \theta) = \sum_{t=1}^T \log p_{\theta}(x_t | x_{t-1}) - \log q(x_0)$$

其中，$q(x_0)$ 表示初始状态 $x_0$ 的先验分布，$p_{\theta}(x_t | x_{t-1})$ 表示在已知前一时刻的状态 $x_{t-1}$ 和神经网络参数 $\theta$ 的情况下，当前时刻 $x_t$ 的条件概率密度函数。在 DDPM 中，条件概率密度函数通常由高斯分布进行建模，即：

$$p_{\theta}(x_t | x_{t-1}) = N(x_t; f_{\theta}(x_{t-1}), \sigma^2 I)$$

其中，$f_{\theta}(x_{t-1})$ 表示使用神经网络计算得到的下一时刻状态的均值，$\sigma$ 是高斯分布的标准差。通过对数似然函数，我们可以估计出生成噪声图像所需的条件概率，从而实现 DDPM 的正向过程。

https://developer.aliyun.com/article/1238471

- SD默认生成512x512大小的图像，但实际上可以生成其它分辨率的图像，但是可能会出现不协调，如果采用多尺度策略训练，会改善这种情况；
- 采用快速的noise scheduler，SD在去噪步数为30～50步时就能生成稳定的图像；
- SD的guidance_scale设置为7～9是比较稳定的，过小和过大都会出现图像质量下降，实际使用中可以根据具体情况灵活调节；
- 可以使用negative prompt来去除不想要的东西来改善图像生成效果；
- 好的prompt对图像生成效果是至关重要的。
上边我们介绍了如何使用SD进行文生图以及一些主要参数，在最后我们也给出文生图这个pipeline的内部流程代码，如下所示：

In [None]:
import torch
from diffusers import AutoencoderKL, UNet2DConditionModel, DDIMScheduler
from transformers import CLIPTextModel, CLIPTokenizer
from tqdm.auto import tqdm
model_id = "runwayml/stable-diffusion-v1-5"
# 1. 加载autoencoder
vae = AutoencoderKL.from_pretrained(model_id, subfolder="vae")
# 2. 加载tokenizer和text encoder 
tokenizer = CLIPTokenizer.from_pretrained(model_id, subfolder="tokenizer")
text_encoder = CLIPTextModel.from_pretrained(model_id, subfolder="text_encoder")
# 3. 加载扩散模型UNet
unet = UNet2DConditionModel.from_pretrained(model_id, subfolder="unet")
# 4. 定义noise scheduler
noise_scheduler = DDIMScheduler(
    num_train_timesteps=1000,
    beta_start=0.00085,
    beta_end=0.012,
    beta_schedule="scaled_linear",
    clip_sample=False, # don't clip sample, the x0 in stable diffusion not in range [-1, 1]
    set_alpha_to_one=False,
)
# 将模型复制到GPU上
device = "cuda"
vae.to(device, dtype=torch.float16)
text_encoder.to(device, dtype=torch.float16)
unet = unet.to(device, dtype=torch.float16)
# 定义参数
prompt = [
    "A dragon fruit wearing karate belt in the snow",
    "A small cactus wearing a straw hat and neon sunglasses in the Sahara desert",
    "A photo of a raccoon wearing an astronaut helmet, looking out of the window at night",
    "A cute otter in a rainbow whirlpool holding shells, watercolor"
]
height = 512
width = 512
num_inference_steps = 50
guidance_scale = 7.5
negative_prompt = ""
batch_size = len(prompt)
# 随机种子
generator = torch.Generator(device).manual_seed(2023)
with torch.no_grad():
 # 获取text_embeddings
 text_input = tokenizer(prompt, padding="max_length", max_length=tokenizer.model_max_length, truncation=True, return_tensors="pt")
    text_embeddings = text_encoder(text_input.input_ids.to(device))[0]
 # 获取unconditional text embeddings
 max_length = text_input.input_ids.shape[-1]
 uncond_input = tokenizer(
     [negative_prompt] * batch_size, padding="max_length", max_length=max_length, return_tensors="pt"
 )
      uncond_embeddings = text_encoder(uncond_input.input_ids.to(device))[0]
 # 拼接为batch，方便并行计算
 text_embeddings = torch.cat([uncond_embeddings, text_embeddings])
 # 生成latents的初始噪音
 latents = torch.randn(
     (batch_size, unet.in_channels, height // 8, width // 8),
     generator=generator, device=device
 )
 latents = latents.to(device, dtype=torch.float16)
 # 设置采样步数
 noise_scheduler.set_timesteps(num_inference_steps, device=device)
 # scale the initial noise by the standard deviation required by the scheduler
 latents = latents * noise_scheduler.init_noise_sigma # for DDIM, init_noise_sigma = 1.0
 timesteps_tensor = noise_scheduler.timesteps
 # Do denoise steps
 for t in tqdm(timesteps_tensor):
     # 这里latens扩展2份，是为了同时计算unconditional prediction
     latent_model_input = torch.cat([latents] * 2)
     latent_model_input = noise_scheduler.scale_model_input(latent_model_input, t) # for DDIM, do nothing
     # 使用UNet预测噪音
        noise_pred = unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample
     # 执行CFG
     noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
     noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
     # 计算上一步的noisy latents：x_t -> x_t-1
     latents = noise_scheduler.step(noise_pred, t, latents).prev_sample
    
 # 注意要对latents进行scale
latents = 1 / 0.18215 * latents
 # 使用vae解码得到图像
image = vae.decode(latents).sample

## 图生图代码

In [None]:
import PIL
import numpy as np
import torch
from diffusers import AutoencoderKL, UNet2DConditionModel, DDIMScheduler
from transformers import CLIPTextModel, CLIPTokenizer
from tqdm.auto import tqdm
model_id = "runwayml/stable-diffusion-v1-5"
# 1. 加载autoencoder
vae = AutoencoderKL.from_pretrained(model_id, subfolder="vae")
# 2. 加载tokenizer和text encoder 
tokenizer = CLIPTokenizer.from_pretrained(model_id, subfolder="tokenizer")
text_encoder = CLIPTextModel.from_pretrained(model_id, subfolder="text_encoder")
# 3. 加载扩散模型UNet
unet = UNet2DConditionModel.from_pretrained(model_id, subfolder="unet")
# 4. 定义noise scheduler
noise_scheduler = DDIMScheduler(
    num_train_timesteps=1000,
    beta_start=0.00085,
    beta_end=0.012,
    beta_schedule="scaled_linear",
    clip_sample=False, # don't clip sample, the x0 in stable diffusion not in range [-1, 1]
    set_alpha_to_one=False,
)
# 将模型复制到GPU上
device = "cuda"
vae.to(device, dtype=torch.float16)
text_encoder.to(device, dtype=torch.float16)
unet = unet.to(device, dtype=torch.float16)
# 预处理init_image
def preprocess(image):
    w, h = image.size
    w, h = map(lambda x: x - x % 32, (w, h))  # resize to integer multiple of 32
    image = image.resize((w, h), resample=PIL.Image.LANCZOS)
    image = np.array(image).astype(np.float32) / 255.0
    image = image[None].transpose(0, 3, 1, 2)
    image = torch.from_numpy(image)
    return 2.0 * image - 1.0
# 参数设置
prompt = ["A fantasy landscape, trending on artstation"]
num_inference_steps = 50
guidance_scale = 7.5
strength = 0.8
batch_size = 1
negative_prompt = ""
generator = torch.Generator(device).manual_seed(2023)
init_image = PIL.Image.open("init_image.png").convert("RGB")
with torch.no_grad():
 # 获取prompt的text_embeddings
 text_input = tokenizer(prompt, padding="max_length", max_length=tokenizer.model_max_length, truncation=True, return_tensors="pt")
    text_embeddings = text_encoder(text_input.input_ids.to(device))[0]
 # 获取unconditional text embeddings
 max_length = text_input.input_ids.shape[-1]
 uncond_input = tokenizer(
     [negative_prompt] * batch_size, padding="max_length", max_length=max_length, return_tensors="pt"
 )
      uncond_embeddings = text_encoder(uncond_input.input_ids.to(device))[0]
 # 拼接batch
 text_embeddings = torch.cat([uncond_embeddings, text_embeddings])
 # 设置采样步数
 noise_scheduler.set_timesteps(num_inference_steps, device=device)
 # 根据strength计算timesteps
 init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
 t_start = max(num_inference_steps - init_timestep, 0)
 timesteps = noise_scheduler.timesteps[t_start:]
 # 预处理init_image
 init_input = preprocess(init_image)
    init_latents = vae.encode(init_input.to(device, dtype=torch.float16)).latent_dist.sample(generator)
    init_latents = 0.18215 * init_latents
 # 给init_latents加噪音
 noise = torch.randn(init_latents.shape, generator=generator, device=device, dtype=init_latents.dtype)
 init_latents = noise_scheduler.add_noise(init_latents, noise, timesteps[:1])
 latents = init_latents # 作为初始latents
 # Do denoise steps
 for t in tqdm(timesteps):
     # 这里latens扩展2份，是为了同时计算unconditional prediction
     latent_model_input = torch.cat([latents] * 2)
     latent_model_input = noise_scheduler.scale_model_input(latent_model_input, t) # for DDIM, do nothing
     # 预测噪音
        noise_pred = unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample
     # CFG
     noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
     noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
     # 计算上一步的noisy latents：x_t -> x_t-1
     latents = noise_scheduler.step(noise_pred, t, latents).prev_sample
    
 # 注意要对latents进行scale
 latents = 1 / 0.18215 * latents
    # 解码
image = vae.decode(latents).sample