# 无条件图像生成扩散模型————用来生成美丽的蝴蝶图像

## 准备工作

In [None]:
#!pip install python-lsp-server[all]
#!export HF_ENDPOINT='https://hf-mirror.com/'
import os
#更改huggingface网址
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com/'
# ！echo 'export HF_ENDPOINT="https://hf-mirror.com"' >> ~/.bashrc
from huggingface_hub import notebook_login
notebook_login()

In [None]:
%pip install -U diffusers datasets transformers accelerate ftfy pyarrow==9.0.0 matplotlib

In [None]:
import numpy as np
import torch
import torch.nn.functional as F
from matplotlib import pyplot as plt
from PIL import Image


def show_images(x):
    """Given a batch of images x, make a grid and convert to PIL"""
    x = x * 0.5 + 0.5  # Map from (-1, 1) back to (0, 1)
    grid = torchvision.utils.make_grid(x)
    grid_im = grid.detach().cpu().permute(1, 2, 0).clip(0, 1) * 255
    grid_im = Image.fromarray(np.array(grid_im).astype(np.uint8))
    return grid_im


def make_grid(images, size=64):
    """Given a list of PIL images, stack them together into a line for easy viewing"""
    output_im = Image.new("RGB", (size * len(images), size))
    for i, im in enumerate(images):
        output_im.paste(im.resize((size, size)), (i * size, 0))
    return output_im


# Mac users may need device = 'mps' (untested)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## 核心内容
- 可以直接使用DiffusionPipeline.from_pretrained获取pipeline，根据模型可输入num_inference_steps 采样步骤数量，guidance_scale 输出与提示的匹配程度生成图像
- 也可自行构建model和scheduler组成pipeline，这里使用UNet2DModel 和 DDPMScheduler

### 数据集下载和预处理

In [None]:
from diffusers import DiffusionPipeline
# Check out https://huggingface.co/sd-dreambooth-library for loads of models from the community
# model_id = "/root/lanyun-tmp/data"
model_id = "/root/diffussion/my_pipeline/"
# model_id = "sd-dreambooth-library/mr-potato-head"

# Load the pipeline 
# 只使用本地文件 local_files_only = True
pipe = DiffusionPipeline.from_pretrained(model_id,local_files_only = True).to(
    device
)
# pipe = DiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.float16,local_files_only = True).to(
#     device
# )
images = pipe(batch_size=8).images
make_grid(images)

In [None]:
import torchvision
from datasets import load_dataset
from torchvision import transforms
from datasets import load_dataset ,Image

# 本地数据集下载
# !huggingface-cli download --resume-download --repo-type dataset huggan/smithsonian_butterflies_subset --local-dir /root/lanyun-tmp/data/smithsonian_butterflies_subset

# 本地根据文件格式选用load_dataset方式
# 更多本地文件载入方式：https://huggingface.co/docs/datasets/loading
# 由于图像的传输一般会用到bytes或base64格式，这里是bytes格式，以二进制形式存储数据，需要使用from datasets import Image的Image方法进行解码，转换为PIL图片：.cast_column("image", Image())，”image“是字典中图片的key
# 更多载入图片数据方法：https://huggingface.co/docs/datasets/image_load
base_url = "/root/lanyun-tmp/data/smithsonian_butterflies_subset/data/"
data_files = {"train": base_url + "train-00000-of-00001.parquet"}
dataset = load_dataset("parquet", data_files=data_files, split="train").cast_column("image", Image())


# We'll train on 32-pixel square images, but you can try larger sizes too
image_size = 32
# You can lower your batch size if you're running out of GPU memory
batch_size = 64

# Define data augmentations
preprocess = transforms.Compose(
    [
        transforms.Resize((image_size, image_size)),  # Resize
        transforms.RandomHorizontalFlip(),  # Randomly flip (data augmentation)
        transforms.ToTensor(),  # Convert to tensor (0, 1)
        transforms.Normalize([0.5], [0.5]),  # Map to (-1, 1)
    ]
)


def transform(examples):
    images = [preprocess(image.convert("RGB")) for image in examples["image"]]
    return {"images": images}

# 每次访问dataset都调用transform
dataset.set_transform(transform)

# Create a dataloader from the dataset to serve up the transformed images in batches
train_dataloader = torch.utils.data.DataLoader(
    dataset, batch_size=batch_size, shuffle=True
)

### 查看数据集内容

In [None]:
xb = next(iter(train_dataloader))["images"].to(device)[:8]
print("X shape:", xb.shape)
show_images(xb).resize((8 * 64, 64), resample=Image.Resampling.NEAREST)

### 构建模型

In [None]:
from diffusers import UNet2DModel

# Create a model
model = UNet2DModel(
    sample_size=image_size,  # the target image resolution
    in_channels=3,  # the number of input channels, 3 for RGB images
    out_channels=3,  # the number of output channels
    layers_per_block=2,  # how many ResNet layers to use per UNet block
    block_out_channels=(64, 128, 128, 256),  # More channels -> more parameters
    down_block_types=(
        "DownBlock2D",  # a regular ResNet downsampling block
        "DownBlock2D",
        "AttnDownBlock2D",  # a ResNet downsampling block with spatial self-attention
        "AttnDownBlock2D",
    ),
    up_block_types=(
        "AttnUpBlock2D",
        "AttnUpBlock2D",  # a ResNet upsampling block with spatial self-attention
        "UpBlock2D",
        "UpBlock2D",  # a regular ResNet upsampling block
    ),
)
model.to(device);

In [None]:
with torch.no_grad():
    model_prediction = model(noisy_xb, timesteps).sample
model_prediction.shape

### 模型训练

In [None]:
import time
start_time = time.time()
# 设置噪声调度器
noise_scheduler = DDPMScheduler(
    num_train_timesteps=1000, beta_schedule="squaredcos_cap_v2"
)

# 训练循环
optimizer = torch.optim.AdamW(model.parameters(), lr=4e-4)

losses = []

for epoch in range(30):
    for step, batch in enumerate(train_dataloader):
        clean_images = batch["images"].to(device)
        # 为图像添加噪声
        noise = torch.randn(clean_images.shape).to(clean_images.device)
        bs = clean_images.shape[0]

        # 为每个图像随机选择一个时间步长
        timesteps = torch.randint(
            0, noise_scheduler.num_train_timesteps, (bs,), device=clean_images.device
        ).long()

        # 根据每个时间步长的噪声幅度向干净图像添加噪声
        noisy_images = noise_scheduler.add_noise(clean_images, noise, timesteps)

        # 获取模型的预测结果
        noise_pred = model(noisy_images, timesteps, return_dict=False)[0]

        # 计算损失
        loss = F.mse_loss(noise_pred, noise)
        loss.backward(loss)
        losses.append(loss.item())

        # 使用优化器更新模型参数
        optimizer.step()
        optimizer.zero_grad()

    if (epoch + 1) % 5 == 0:
        loss_last_epoch = sum(losses[-len(train_dataloader) :]) / len(train_dataloader)
        print(f"Epoch:{epoch+1}, loss: {loss_last_epoch}")

end_time = time.time()
training_time = end_time - start_time
print(f"Training time: {training_time} seconds")


In [None]:
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
axs[0].plot(losses)
axs[1].plot(np.log(losses))
plt.show()
np.min(losses)
# 保存图片到文件
fig.savefig('loss.png')

### 模型预测
这里采用了两种方法，一种通过DDPMPipeline方法调用pipeline，另一种使用model和noise_scheduler

In [None]:
from diffusers import DDPMPipeline
image_pipe = DDPMPipeline(unet=model, scheduler=noise_scheduler)
pipeline_output = image_pipe()
pipeline_output.images[0]
#image_pipe.save_pretrained("my_pipeline")

# Random starting point (8 random images):
sample = torch.randn(8, 3, 32, 32).to(device)

for i, t in enumerate(noise_scheduler.timesteps):

    # Get model pred
    with torch.no_grad():
        residual = model(sample, t).sample

    # Update sample with step
    sample = noise_scheduler.step(residual, t, sample).prev_sample

show_images(sample)