In [None]:
# 扩散模型演示

本笔记本展示了扩散模型的基本概念和使用方法。我们将通过简单的例子，展示扩散模型是如何工作的，以及如何使用我们的框架进行训练和推理。


In [None]:
## 1. 基本导入


In [None]:
import os
import sys
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision.utils import make_grid
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from PIL import Image

# 添加项目根目录到路径
sys.path.append(os.path.abspath('..'))

# 导入项目模块
from models.diffusion.unet import UNet
from models.diffusion.ddpm import DiffusionModel
from config.model_config import DiffusionConfig
from inference.sampling import DDPMSampler, DDIMSampler

# 设置显示图像函数
def show_images(images, title="", figsize=(10, 10)):
    """显示图像网格"""
    # 如果是PyTorch张量，转换为numpy数组
    if isinstance(images, torch.Tensor):
        images = images.detach().cpu().numpy()
        
    # 调整为[0, 1]范围
    if images.max() > 1.0 or images.min() < 0.0:
        images = (images + 1) / 2.0  # 从[-1, 1]转换到[0, 1]
    images = np.clip(images, 0, 1)
    
    # 调整轴维度
    if images.ndim == 3 and images.shape[0] == 3:  # 单个图像，通道在前
        images = images.transpose(1, 2, 0)
        plt.figure(figsize=figsize)
        plt.imshow(images)
        plt.title(title)
        plt.axis('off')
    else:  # 批次图像
        if images.ndim == 4:  # [B, C, H, W]
            grid = make_grid(torch.tensor(images), nrow=int(np.sqrt(images.shape[0])))
            plt.figure(figsize=figsize)
            plt.imshow(grid.permute(1, 2, 0).numpy())
            plt.title(title)
            plt.axis('off')
        else:  # 单个灰度图像
            plt.figure(figsize=figsize)
            plt.imshow(images, cmap='gray')
            plt.title(title)
            plt.axis('off')


In [None]:
## 2. 扩散模型原理可视化

在这个部分，我们将通过简单的可视化，展示扩散模型的基本概念：前向扩散过程和反向去噪过程。


In [None]:
# 加载一张示例图像
def load_example_image(size=256):
    """加载一张示例图像（用户需要提供路径）"""
    # 这里可以替换为您自己的图像路径
    image_path = "../data/example.jpg"  # 替换为实际图像路径
    
    try:
        img = Image.open(image_path).convert('RGB')
        img = img.resize((size, size))
        # 转换为张量 [0, 1]
        img_tensor = torch.tensor(np.array(img)).float() / 255.0
        # 调整为[C, H, W]格式
        img_tensor = img_tensor.permute(2, 0, 1)
        # 转换到[-1, 1]范围
        img_tensor = img_tensor * 2.0 - 1.0
        return img_tensor
    except Exception as e:
        print(f"无法加载示例图像: {e}")
        # 创建一个简单的合成图像作为替代
        print("使用合成图像代替...")
        # 生成简单的彩色图案
        x = torch.linspace(-1, 1, size)
        y = torch.linspace(-1, 1, size)
        xv, yv = torch.meshgrid(x, y)
        r = torch.sin(xv * 3.14 * 2) * 0.5 + 0.5
        g = torch.cos(yv * 3.14 * 2) * 0.5 + 0.5
        b = torch.sin((xv + yv) * 3.14) * 0.5 + 0.5
        img = torch.stack([r, g, b]) * 2 - 1  # 转换至[-1, 1]范围
        return img

# 加载示例图像
example_image = load_example_image(size=256)

# 显示原始图像
show_images(example_image, title="原始图像", figsize=(6, 6))


In [None]:
# 创建简单的扩散过程
def diffusion_process_visualization(image, num_steps=10):
    """可视化扩散过程"""
    # 创建一个基本的扩散配置
    config = DiffusionConfig()
    
    # 设置时间步
    timesteps = torch.linspace(0, config.timesteps-1, num_steps).long()
    
    # 计算beta值和其他参数
    betas = torch.linspace(config.beta_start, config.beta_end, config.timesteps)
    alphas = 1.0 - betas
    alphas_cumprod = torch.cumprod(alphas, dim=0)
    
    # 可视化扩散过程
    plt.figure(figsize=(15, 6))
    all_images = []
    for i, t in enumerate(timesteps):
        # 计算噪声水平
        alpha_cumprod = alphas_cumprod[t]
        
        # 添加噪声
        noise = torch.randn_like(image)
        noisy_image = torch.sqrt(alpha_cumprod) * image + torch.sqrt(1 - alpha_cumprod) * noise
        
        all_images.append(noisy_image)
    
    # 创建网格
    all_images_tensor = torch.stack(all_images)
    show_images(all_images_tensor, title=f"扩散过程: 从原始图像到噪声 ({num_steps}步)", figsize=(15, 5))

# 可视化扩散过程
diffusion_process_visualization(example_image.unsqueeze(0), num_steps=10)


In [None]:
## 3. 设置小型扩散模型

在这部分，我们将设置一个简单的扩散模型用于演示。


In [None]:
# 创建小型扩散模型
def create_small_diffusion_model():
    """创建小型扩散模型用于演示"""
    # 设置配置
    config = DiffusionConfig()
    config.model_dim = 64        # 小型模型
    config.timesteps = 200       # 减少步数
    config.unet_dim_mults = (1, 2, 4)  # 简化U-Net结构
    
    # 创建U-Net
    unet = UNet(
        in_channels=3,
        model_channels=config.model_dim,
        out_channels=3,
        channel_mult=config.unet_dim_mults,
        time_emb_dim=config.time_emb_dim
    )
    
    # 创建扩散模型
    model = DiffusionModel(unet, config)
    
    return model, config

# 创建模型
model, config = create_small_diffusion_model()
print(f"模型参数数量: {sum(p.numel() for p in model.parameters()):,}")


In [None]:
## 4. 采样演示

下面我们将展示如何从扩散模型采样。为了演示，我们将使用随机权重的模型。在实际应用中，您需要先训练模型。


In [None]:
# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# 创建采样器
ddpm_sampler = DDPMSampler(model, config.timesteps, device)
ddim_sampler = DDIMSampler(model, config.timesteps, sampling_steps=50, device=device)

# 设置图像形状
image_shape = (3, 64, 64)  # 较小的图像用于演示

# DDPM采样(较慢)
print("DDPM采样中...")
with torch.no_grad():
    samples_ddpm = ddpm_sampler.sample(
        batch_size=4,
        image_size=image_shape,
        verbose=True
    )
    
# 显示结果
show_images(samples_ddpm, title="DDPM采样结果 (随机权重模型)", figsize=(10, 10))

# DDIM采样(更快)
print("\nDDIM采样中...")
with torch.no_grad():
    samples_ddim = ddim_sampler.sample(
        batch_size=4,
        image_size=image_shape,
        verbose=True
    )
    
# 显示结果
show_images(samples_ddim, title="DDIM采样结果 (随机权重模型)", figsize=(10, 10))


In [None]:
## 5. 训练流程示例

下面是训练流程的示例，由于训练需要数据和较长时间，这里只展示代码框架。


In [None]:
from data.datasets.image_dataset import ImageFolderDataset, get_dataloaders
from training.trainer import Trainer

def train_diffusion_example(data_dir, epochs=10):
    """训练扩散模型的示例流程"""
    # 注意：这个函数只展示了代码框架，实际运行需要准备数据集
    
    # 1. 创建数据集和数据加载器
    dataset = ImageFolderDataset(
        root_dir=data_dir,  # 替换为实际数据目录
        image_size=64       # 使用小图像进行快速训练
    )
    
    train_loader, val_loader = get_dataloaders(
        dataset=dataset,
        batch_size=16,
        val_split=0.1,
        num_workers=2
    )
    
    # 2. 创建模型
    model, config = create_small_diffusion_model()
    model = model.to(device)
    
    # 3. 创建训练器
    trainer = Trainer(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        config={"learning_rate": 1e-4, "use_ema": True},
        device=device,
        save_dir="../checkpoints"
    )
    
    # 4. 训练模型
    trainer.train(num_epochs=epochs)
    
    return model, trainer

# 这段代码注释掉，因为需要数据集
# data_dir = "../data/your_dataset_folder"  # 替换为实际数据目录
# trained_model, trainer = train_diffusion_example(data_dir, epochs=5)


In [None]:
## 6. 扩展应用：简单文本条件示例

如何使用文本条件进行生成的简单示例框架。


In [None]:
# 文本条件示例
def text_conditioning_example():
    """文本条件扩散模型示例代码"""
    # 导入文本处理库
    from transformers import AutoTokenizer, AutoModel
    
    # 加载分词器和文本编码器模型
    tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
    text_encoder = AutoModel.from_pretrained("bert-base-uncased")
    
    # 模拟一些文本描述
    prompts = [
        "一只猫坐在草坪上",
        "日落时的海滩景色",
        "城市夜景",
        "雪山风景"
    ]
    
    # 编码文本 (实际应用中这会被送入条件扩散模型)
    text_tokens = tokenizer(prompts, padding=True, truncation=True, return_tensors="pt")
    text_embeddings = text_encoder(**text_tokens).last_hidden_state.mean(dim=1)
    
    print(f"生成的文本嵌入形状: {text_embeddings.shape}")
    
    # 注意：实际应用中，这些嵌入会被用作扩散模型的条件输入
    # 通常是通过交叉注意力机制或者其他条件方法

# 这个部分注释掉，因为需要安装额外的库
# text_conditioning_example()


In [None]:
## 7. 结论

在这个笔记本中，我们展示了扩散模型的基本概念、采样过程和简单应用。扩散模型是一种强大的生成模型，可以用于各种任务，如图像生成、超分辨率、图像编辑等。

要深入了解扩散模型，可以查看我们的代码库中更多的示例和实现，或者参考论文目录中的相关文献。
