# DreamBooth 实现 (附理论解释)

这个 Notebook 实现了 DreamBooth 算法 ([Ruiz et al., 2023](https://arxiv.org/abs/2208.12242))，用于微调预训练的文本到图像扩散模型（如 Stable Diffusion），使其能够学习并生成用户提供的特定主体（对象或风格）的新图像。

**核心思想:** 通过少量（通常 3-5 张）关于特定主体的图像，微调整个预训练模型，同时使用一个特殊的**稀有标识符**（如 `sks`）和一个**类别名词**（如 `dog`）来构建提示词。为了防止模型在学习新主体时忘记原有的类别知识（**语言漂移**问题），引入了**先验保留损失**。

它包含以下部分：
1.  **依赖项检查和导入**
2.  **理论基础与辅助函数定义** (数据集加载、稀有令牌查找、类别图像生成等)
3.  **训练函数** (`dreambooth_training`)
4.  **推理函数** (`inference`)
5.  **配置和执行** (设置参数并运行训练或推理)

## 1. 依赖项检查和导入

In [1]:
import os
import random
import numpy as np
import argparse
from pathlib import Path
from tqdm.auto import tqdm
from PIL import Image

# 首先检查依赖项
def check_dependencies():
    missing_deps = []
    try:
        import torch
    except ImportError:
        missing_deps.append("torch")
    
    try:
        import accelerate
    except ImportError:
        missing_deps.append("accelerate")
        
    try:
        import transformers
    except ImportError:
        missing_deps.append("transformers")
        
    try:
        import diffusers
    except ImportError:
        missing_deps.append("diffusers")
    
    return missing_deps

missing = check_dependencies()
if missing:
    print("缺少必要的依赖项，请先安装：")
    print(f"pip install {' '.join(missing)}")
    print("\n如果遇到Flash Attention相关错误，请尝试：")
    print("pip install diffusers==0.19.3 transformers==4.30.2 accelerate xformers")
    # 在Notebook中，我们通常不直接退出，而是抛出错误或显示消息
    raise ImportError(f"缺少依赖: {', '.join(missing)}")
else:
    print("所有基本依赖项已满足。")

# 正常导入，现在有更好的错误处理
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

try:
    # 尝试导入主要模块
    from diffusers import StableDiffusionPipeline, DDPMScheduler, UNet2DConditionModel
    from transformers import CLIPTextModel, CLIPTokenizer
    print("Diffusers 和 Transformers 导入成功。")
except ImportError as e:
    print(f"导入错误: {e}")
    print("\n请尝试安装兼容版本的依赖项:")
    print("pip install diffusers==0.19.3 transformers==4.30.2 accelerate")
    print("如果您使用NVIDIA GPU，可以添加: xformers")
    raise e

from accelerate import Accelerator
print(f"PyTorch 版本: {torch.__version__}")
print(f"CUDA 可用: {torch.cuda.is_available()}")

  from .autonotebook import tqdm as notebook_tqdm


所有基本依赖项已满足。
Diffusers 和 Transformers 导入成功。
PyTorch 版本: 2.7.0+cu128
CUDA 可用: True
Diffusers 和 Transformers 导入成功。
PyTorch 版本: 2.7.0+cu128
CUDA 可用: True


## 2. 理论基础与辅助函数定义

### 2.1 数据集准备 (`DreamBoothDataset`)

DreamBooth 需要两种类型的图像数据：

1.  **实例图像 (Instance Images):** 用户提供的、包含特定主体的少量图像（例如，你的宠物的照片）。这些图像用于教会模型这个新主体的具体外观。
2.  **类别图像 (Class Images):** 由预训练模型自身生成的、属于同一类别的图像（例如，各种不同的狗的照片）。这些图像用于**先验保留**，确保模型在学习新主体时，不会忘记生成该类别其他实例的能力。

数据集类 `DreamBoothDataset` 负责加载这两种图像，进行预处理（裁剪、缩放、归一化），并在训练时提供给模型。训练时，模型会同时看到实例图像和类别图像。

In [None]:
class DreamBoothDataset(Dataset):
    def __init__(self, instance_images_path, class_images_path, tokenizer, size=512, center_crop=True):
        self.size = size
        self.center_crop = center_crop
        self.tokenizer = tokenizer
        
        # 简化图像加载
        self.instance_images = self._load_images(instance_images_path) if os.path.exists(instance_images_path) else []
        self.class_images = self._load_images(class_images_path) if class_images_path and os.path.exists(class_images_path) else []
        print(f"加载了 {len(self.instance_images)} 张实例图像和 {len(self.class_images)} 张类别图像")
    
    def _load_images(self, path):
        images = []
        if not path:
            return images
            
        print(f"从 {path} 加载图像...")
        for file in tqdm(os.listdir(path), desc=f"加载 {os.path.basename(path)}"):
            if file.lower().endswith(('.png', '.jpg', '.jpeg')):
                try:
                    img = Image.open(os.path.join(path, file)).convert('RGB')
                    # 简化图像处理
                    if self.center_crop:
                        w, h = img.size
                        min_dim = min(w, h)
                        img = img.crop(((w-min_dim)//2, (h-min_dim)//2, (w+min_dim)//2, (h+min_dim)//2))
                    img = img.resize((self.size, self.size))
                    images.append(img)
                except Exception as e:
                    print(f"警告: 跳过无法加载的图像 {file}: {e}")
        return images
    
    def __len__(self):
        # 确保至少有一个实例图像
        if not self.instance_images:
             raise ValueError("错误: 实例图像目录为空或无法加载任何图像。请检查 instance_data_dir。")
        # 返回实例图像和类别图像中较大的数量，以确保训练时两者都能被充分利用
        return max(len(self.instance_images), len(self.class_images))
    
    def __getitem__(self, idx):
        item = {}
        # 获取实例图像 (循环使用，确保每个 epoch 都能覆盖所有类别图像)
        instance_image = self.instance_images[idx % len(self.instance_images)]
        instance_image_tensor = torch.from_numpy(np.array(instance_image).astype(np.float32) / 127.5 - 1.0)
        item["instance_pixel_values"] = instance_image_tensor.permute(2, 0, 1)
        
        # 获取类别图像 (如果存在，同样循环使用)
        if self.class_images:
            class_image = self.class_images[idx % len(self.class_images)]
            class_image_tensor = torch.from_numpy(np.array(class_image).astype(np.float32) / 127.5 - 1.0)
            item["class_pixel_values"] = class_image_tensor.permute(2, 0, 1)
            
        return item

### 2.2 稀有标识符 (`find_rare_token`)

为了让模型能够区分我们想要生成的特定主体和它所属的一般类别，DreamBooth 引入了一个**唯一的、稀有的标识符**（论文中表示为 `[V]`）。这个标识符通常是从词汇表中随机选择的一个在预训练数据中出现频率很低的词（例如 `sks`, `xqv`）。

训练时，使用包含这个标识符的提示词来描述实例图像：
*   **实例提示词 (Instance Prompt):** `a [V] [class noun]` (例如: `a sks dog`)

而描述类别图像时，则使用不包含标识符的提示词：
*   **类别提示词 (Class Prompt):** `a [class noun]` (例如: `a dog`)

通过这种方式，模型可以将稀有标识符 `[V]` 与特定主体的视觉特征关联起来，同时保持对类别名词 `[class noun]` 的一般性理解。

`find_rare_token` 函数就是用来从 tokenizer 的词汇表中找到这样一个合适的稀有词。

In [None]:
if train_mode:
    print("\n开始 DreamBooth 训练...")
    # 检查GPU并优化设置
    if torch.cuda.is_available():
        gpu_info = torch.cuda.get_device_name()
        gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3
        print(f"\n已检测到GPU: {gpu_info} ({gpu_memory:.1f}GB)")
        
        # 根据GPU内存调整优化参数 (示例)
        if gpu_memory < 6 and prior_generation_samples > 30:  # 低端GPU
            print(f"警告: 检测到低显存GPU ({gpu_memory:.1f}GB)。当前先验样本数为 {prior_generation_samples}。")
            print("建议减少先验图像数量 (例如 prior_generation_samples = 30) 以避免内存不足。")
            # 可以选择自动调整或提示用户
            # prior_generation_samples = 30
            # print("已自动调整先验图像数量为 30")
        if gpu_memory < 8 and train_text_encoder:
             print(f"警告: 显存可能不足以同时训练文本编码器。如果遇到 OOM 错误，请尝试设置 train_text_encoder = False。")
    else:
        print("\n警告: 未检测到GPU! 训练将在CPU上运行，这会非常慢!")
        # 在Notebook中通常不强制退出，让用户决定是否继续
        # use_cpu = input("是否继续在CPU上训练? [y/n]: ").lower()
        # if use_cpu != 'y':
        #     print("已取消训练。请在GPU环境下运行。")
        #     # raise RuntimeError("需要GPU进行训练") # 或者抛出错误
            
    try:
        trained_identifier = dreambooth_training(
            pretrained_model_name_or_path=pretrained_model_name_or_path,
            instance_data_dir=instance_data_dir,
            output_dir=output_dir,
            class_prompt=class_prompt,
            instance_prompt_template=instance_prompt_template,
            learning_rate=learning_rate,
            max_train_steps=max_train_steps,
            prior_preservation_weight=prior_preservation_weight,
            prior_generation_samples=prior_generation_samples,
            train_text_encoder=train_text_encoder,
            seed=seed_train,
            mixed_precision=mixed_precision
        )
        print("\n训练完成！")
        print(f"模型已保存到: {output_dir}")
        print(f"学习到的标识符: {trained_identifier}")
        print(f"现在可以将 infer_mode 设置为 True，并使用提示词模板 '{prompt_template_infer.format(trained_identifier)}' 进行推理。")
    except Exception as e:
         print(f"\n训练过程中发生错误: {e}")
         import traceback
         traceback.print_exc()
         print("\n请检查错误信息、参数配置和依赖项安装。")

elif infer_mode:
    print("\n开始 DreamBooth 推理...")
    # 添加CUDA检测
    device = "cuda" if torch.cuda.is_available() else "cpu"
    if device == "cpu":
        print("\n警告: 未检测到可用的CUDA GPU。将使用CPU运行，这会非常慢!")
        # 在Notebook中通常不强制退出
    else:
        gpu_info = torch.cuda.get_device_name()
        gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3
        print(f"\n成功检测到GPU: {gpu_info} ({gpu_memory:.1f}GB)")
        print(f"将使用设备: {device}")

    generated_images, saved_paths = inference(
        model_path=output_dir, # 推理时加载训练好的模型
        prompt_template=prompt_template_infer,
        class_prompt=class_prompt, # 用于查找 identifier.txt
        # identifier=None, # 通常让函数自动从文件加载
        output_image_path=output_image_path,
        num_images=num_images_to_generate,
        guidance_scale=guidance_scale,
        num_inference_steps=num_inference_steps,
        seed=seed_infer
    )
    
    if generated_images:
        print("\n推理完成！")
        # 可选：在Notebook中显示生成的图像
        from IPython.display import display
        for img_path in saved_paths:
             try:
                 display(Image.open(img_path))
             except Exception as display_e:
                  print(f"无法显示图像 {img_path}: {display_e}")
    else:
        print("\n推理失败。请检查错误信息。")

else:
    print("\n没有选择操作模式。请在配置单元格中设置 train_mode=True 或 infer_mode=True，然后重新运行该单元格和此执行单元格。")