# Diffusion -- diffusers

## Introduction
diffusers 是一款由huggingface开发的生成式模型调用库，可以便捷调用相应的模型。  
在下面的部分中，我将演示如何使用diffusers开展实验，祝各位学习愉快😉

## DDPM


diffusers可以有两种调用方式，详细内容可以见[website](https://www.llamafactory.cn/huggingface-docs/diffusers/using-diffusers/write_own_pipeline.html)

第一种就是调用高集成的pipeline，调用简洁，但可拓展性差。

In [None]:
import torch
from diffusers import DDPMPipeline

# Run a demo
model_id = "google/ddpm-bedroom-256"
ddpm = DDPMPipeline.from_pretrained(model_id).to("cuda")
# image = ddpm().images[0]
# image.save("bedroom.png")

第二种方式是将其拆分出 Model 和 Scheduler，调用相对复杂，但是可拓展性强。后面的实验都使用这一种。

In [None]:
from diffusers import DDPMScheduler, UNet2DModel

model_id = "google/ddpm-bedroom-256"
scheduler = DDPMScheduler.from_pretrained(model_id)
model = UNet2DModel.from_pretrained(model_id).to("cuda")

In [None]:
# Set denoise steps
scheduler.set_timesteps(50)
scheduler.timesteps
# Generate noise
sample_size = model.config.sample_size
noise = torch.randn((1, 3, sample_size, sample_size), device="cuda")

In [None]:
# Denoise step by step
input = noise

for t in scheduler.timesteps:
    with torch.no_grad():
        noisy_residual = model(input, t).sample
    previous_noisy_sample = scheduler.step(noisy_residual, t, input).prev_sample
    input = previous_noisy_sample
    
input

### Exper1
我们将探索 $\hat{x}_0 = \frac{x_t-\sqrt{1-\bar{\alpha}_t}\cdot \sigma_\theta(x_t, \ t)}{\sqrt{ \bar{\alpha_t}}}$ 以及 `input` 的生成 Pattern

In [None]:

def add_text_to_image(image_path, output_path, title, save=False):
    from PIL import Image, ImageDraw, ImageFont
    
    image = Image.open(image_path)
    draw = ImageDraw.Draw(image)
    font = ImageFont.truetype("DejaVuSans-Bold.ttf", 36)
    text_position = (10,10)
    draw.text(text_position, title, font=font, fill='white')
    if save: 
        image.save(output_path)
    return image

def savefig(input, name):
    from PIL import Image
    image = (input / 2 + 0.5).clamp(0, 1).squeeze()
    image = (image.permute(1, 2, 0) * 255).round().to(torch.uint8).cpu().numpy()
    image = Image.fromarray(image)
    image.save(name)

In [None]:
import numpy as np
import random
from tqdm import tqdm
import os

def setup_seed(seed):
     torch.manual_seed(seed)
     torch.cuda.manual_seed_all(seed)
     np.random.seed(seed)
     random.seed(seed)
     torch.backends.cudnn.deterministic = True
setup_seed(46)

scheduler.set_timesteps(250)
input = torch.randn((1, 3, sample_size, sample_size), device="cuda")

os.makedirs("results1", exist_ok=True)

for index, t in tqdm(enumerate(scheduler.timesteps)):
    with torch.no_grad():
        noisy_residual = model(input, t).sample
    
    alpha_cumprod_t = scheduler.alphas_cumprod[t].to("cuda")
    
    x0_hat = (input - (1 - alpha_cumprod_t).sqrt() * noisy_residual) / alpha_cumprod_t.sqrt()
    
    # visualize
    shape = x0_hat.shape
    white = torch.ones((shape[0], shape[1], shape[2], 10))
    result = torch.cat([input.cpu(), white, noisy_residual.cpu(), white, x0_hat.cpu()], dim=3)
    savefig(result, f"results1/img{index}.png")
    
    input = scheduler.step(noisy_residual, t, input).prev_sample

savefig(input, "final1.png")

### Optional
我们可以从生成的结果中，挑选一部分steps，粘贴在一起，方便可视化Pattern

In [15]:
from PIL import Image

img_list = ['results3/final-%03d.png'%(index*50) for index in range(1, 6)]


def vertical_stitch(image_paths, output_path):
    images = [add_text_to_image(path, None, path[9:-4], False) for path in image_paths]
    
    widths = [img.width for img in images]
    if len(set(widths)) > 1:
        raise ValueError("所有图片的宽度必须一致")
    
    total_height = sum(img.height for img in images)
    width = images[0].width 

    new_image = Image.new('RGB', (width, total_height))
    
    current_height = 0
    for img in images:
        new_image.paste(img, (0, current_height))
        current_height += img.height
    
    new_image.save(output_path)
    print(f"拼接完成，已保存至 {output_path}")


vertical_stitch(img_list, "output.png")

拼接完成，已保存至 output.png


我们将 $x_0$ 固定住，进行试验

In [None]:
import random
import torchvision.transforms as transforms
from tqdm import tqdm
import os

def setup_seed(seed):
     torch.manual_seed(seed)
     torch.cuda.manual_seed_all(seed)
     np.random.seed(seed)
     random.seed(seed)
     torch.backends.cudnn.deterministic = True
setup_seed(46)

scheduler.set_timesteps(250)
input = torch.randn((1, 3, sample_size, sample_size), device="cuda")

x0 = Image.open("final1.png")
to_tensor = transforms.ToTensor()
x0 = to_tensor(x0)
x0 = x0*2-1
x0 = x0.to("cuda").unsqueeze(0)

os.makedirs("results2", exist_ok=True)

for index, t in tqdm(enumerate(scheduler.timesteps)):
    alpha_cumprod_t = scheduler.alphas_cumprod[t].to("cuda")
    with torch.no_grad():
        noisy_residual = (input-alpha_cumprod_t.sqrt()*x0)/(1-alpha_cumprod_t).sqrt()
    
    shape = x0.shape
    white = torch.ones((shape[0], shape[1], shape[2], 10))
    result = torch.cat([input.cpu(), white, noisy_residual.cpu(), white, x0.cpu()], dim=-1)
    savefig(result, f"results2/img{index}.png")
    
    input = scheduler.step(noisy_residual, t, input).prev_sample

savefig(input, "final2.png")

### Exper2
接下来我们将探索当 denoise steps 改变时，图像生成质量会发生怎样的改变

In [None]:
def sample(scheduler, input, model, save_dir=None, is_record=False)->torch.tensor:
    assert save_dir, "Please write down save dir!"
    os.makedirs(save_dir, exist_ok=True)
    
    for index, t in tqdm(enumerate(scheduler.timesteps)):
        with torch.no_grad():
            noisy_residual = model(input, t).sample
        
        if is_record:
            alpha_cumprod_t = scheduler.alphas_cumprod[t].to("cuda")
            x0_hat = (input - (1 - alpha_cumprod_t).sqrt() * noisy_residual) / alpha_cumprod_t.sqrt()
            # visualize
            shape = x0_hat.shape
            white = torch.ones((shape[0], shape[1], shape[2], 10))
            result = torch.cat([input.cpu(), white, noisy_residual.cpu(), white, x0_hat.cpu()], dim=3)
            savefig(result, os.path.join(save_dir, "/img%03d.png"%(index)))
        
        input = scheduler.step(noisy_residual, t, input).prev_sample
    
    return input

In [None]:
import numpy as np
import random
from tqdm import tqdm
import os

def setup_seed(seed):
     torch.manual_seed(seed)
     torch.cuda.manual_seed_all(seed)
     np.random.seed(seed)
     random.seed(seed)
     torch.backends.cudnn.deterministic = True
setup_seed(46)

timesteps = [index for index in range(250, 0, -50)]
input = torch.randn((1, 3, sample_size, sample_size), device="cuda")

for timestep in tqdm(timesteps):
    scheduler.set_timesteps(timestep)
    output = sample(scheduler, input, model, "results3", False)
    savefig(output, "results3/final-%03d.png"%(timestep))
