# MorphoDiff 完整流程：从预处理到训练到推理

本笔记本包含了 MorphoDiff 的完整流程，包括：
1. 环境设置和依赖安装
2. 数据预处理
3. 扰动编码（Perturbation Encoding）
4. 模型训练
5. 图像生成和推理
6. 结果评估

MorphoDiff 是一个基于扩散模型的生成管道，能够基于扰动编码预测不同条件下的高分辨率细胞形态响应。

## 1. 环境设置和依赖安装

In [None]:
# 检查CUDA可用性
import torch
import os
import sys

print(f"Python 版本: {sys.version}")
print(f"PyTorch 版本: {torch.__version__}")
print(f"CUDA 可用: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA 设备数量: {torch.cuda.device_count()}")
    print(f"当前CUDA设备: {torch.cuda.current_device()}")
    print(f"设备名称: {torch.cuda.get_device_name()}")

In [None]:
# 安装必要的包（如果尚未安装）
import subprocess
import sys

def install_package(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

# 基础依赖
packages = [
    "accelerate",
    "diffusers",
    "transformers",
    "datasets",
    "wandb",
    "Pillow",
    "numpy",
    "pandas",
    "torch",
    "torchvision",
    "tqdm",
    "matplotlib",
    "seaborn"
]

print("检查并安装必要的包...")
for package in packages:
    try:
        __import__(package.replace("-", "_"))
        print(f"✓ {package} 已安装")
    except ImportError:
        print(f"安装 {package}...")
        install_package(package)
        print(f"✓ {package} 安装完成")

In [None]:
# 导入所有必要的库
import os
import sys
import json
import math
import time
import shutil
import argparse
import subprocess
from pathlib import Path
from datetime import datetime
from typing import Optional

import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.auto import tqdm

# Diffusers和HuggingFace库
from diffusers import (
    AutoencoderKL, 
    DDPMScheduler, 
    StableDiffusionPipeline, 
    UNet2DConditionModel
)
from diffusers.optimization import get_scheduler
from transformers import AutoFeatureExtractor
from datasets import load_dataset
from accelerate import Accelerator
from accelerate.utils import set_seed

# 设置随机种子
set_seed(42)

print("所有库导入成功！")

## 2. 配置路径和参数

在这里设置所有重要的路径和参数。根据您的具体设置调整这些路径。

In [None]:
# ===== 配置参数 =====
# 根据您的环境调整这些路径

# 基础路径配置
PROJECT_ROOT = "/home/runner/work/MorphoDiff/MorphoDiff"  # 项目根目录
MORPHODIFF_ROOT = os.path.join(PROJECT_ROOT, "morphodiff")  # morphodiff 代码目录

# 数据路径（需要根据实际情况调整）
DATA_ROOT = "/tmp/morphodiff_data"  # 临时数据目录
DATASET_NAME = "BBBC021"  # 数据集名称
TRAIN_DATA_DIR = os.path.join(DATA_ROOT, f"{DATASET_NAME}/train_imgs/")  # 训练数据目录

# 模型和检查点路径
MODEL_CHECKPOINTS_DIR = os.path.join(DATA_ROOT, "checkpoints")
PRETRAINED_MODEL = "CompVis/stable-diffusion-v1-4"  # 预训练模型
OUTPUT_DIR = os.path.join(MODEL_CHECKPOINTS_DIR, f"{DATASET_NAME}-MorphoDiff")

# 生成图像输出路径
GENERATED_IMAGES_DIR = os.path.join(DATA_ROOT, f"{DATASET_NAME}/generated_imgs/")

# 日志路径
LOG_DIR = os.path.join(DATA_ROOT, "logs")

# 创建必要的目录
os.makedirs(DATA_ROOT, exist_ok=True)
os.makedirs(TRAIN_DATA_DIR, exist_ok=True)
os.makedirs(MODEL_CHECKPOINTS_DIR, exist_ok=True)
os.makedirs(GENERATED_IMAGES_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)

# 训练参数
TRAINING_CONFIG = {
    "resolution": 512,
    "train_batch_size": 4,  # 根据GPU内存调整
    "gradient_accumulation_steps": 8,
    "max_train_steps": 500,  # 演示用的较小步数
    "learning_rate": 1e-5,
    "lr_scheduler": "constant",
    "lr_warmup_steps": 0,
    "validation_epochs": 50,
    "checkpointing_steps": 100,
    "seed": 42
}

# 验证的扰动ID
VALIDATION_PROMPTS = ["cytochalasin-d", "docetaxel", "epothilone-b"]

print(f"项目根目录: {PROJECT_ROOT}")
print(f"数据目录: {DATA_ROOT}")
print(f"训练数据目录: {TRAIN_DATA_DIR}")
print(f"输出目录: {OUTPUT_DIR}")
print(f"生成图像目录: {GENERATED_IMAGES_DIR}")
print("\n配置完成！")

## 3. 数据预处理

这一部分演示如何准备数据，包括图像预处理和创建 metadata.jsonl 文件。

In [None]:
# 创建示例数据集结构
# 在实际使用中，您需要从 BBBC021、RxRx1 或其他数据集下载真实数据

def create_sample_dataset():
    """创建示例数据集用于演示"""
    
    # 创建一些示例图像
    sample_compounds = [
        "cytochalasin-d", "docetaxel", "epothilone-b", 
        "camptothecin", "taxol", "leupeptin"
    ]
    
    metadata_entries = []
    
    for i, compound in enumerate(sample_compounds):
        # 创建示例图像（实际使用中应该是真实的细胞图像）
        for j in range(3):  # 每个化合物3张图像
            # 生成随机彩色图像作为示例
            image_data = np.random.randint(0, 256, (512, 512, 3), dtype=np.uint8)
            image = Image.fromarray(image_data)
            
            # 保存图像
            image_filename = f"{compound}_{j:03d}.png"
            image_path = os.path.join(TRAIN_DATA_DIR, image_filename)
            image.save(image_path)
            
            # 创建metadata条目
            metadata_entry = {
                "file_name": image_filename,
                "additional_feature": compound,  # 扰动ID
                "image": image_filename
            }
            metadata_entries.append(metadata_entry)
    
    # 保存metadata.jsonl文件
    metadata_path = os.path.join(TRAIN_DATA_DIR, "metadata.jsonl")
    with open(metadata_path, 'w') as f:
        for entry in metadata_entries:
            f.write(json.dumps(entry) + '\n')
    
    print(f"创建了 {len(metadata_entries)} 个训练样本")
    print(f"图像保存在: {TRAIN_DATA_DIR}")
    print(f"元数据保存在: {metadata_path}")
    
    return metadata_entries

# 创建示例数据集
sample_metadata = create_sample_dataset()

# 显示数据集信息
print("\n数据集信息:")
df = pd.DataFrame(sample_metadata)
print(df.groupby('additional_feature').size())
print(f"\n总共 {len(df)} 张图像")

In [None]:
# 验证数据集格式
def verify_dataset_format(data_dir):
    """验证数据集格式是否符合要求"""
    
    metadata_path = os.path.join(data_dir, "metadata.jsonl")
    
    if not os.path.exists(metadata_path):
        print("❌ metadata.jsonl 文件不存在")
        return False
    
    # 读取metadata
    metadata = []
    with open(metadata_path, 'r') as f:
        for line in f:
            metadata.append(json.loads(line.strip()))
    
    print(f"✓ 找到 {len(metadata)} 个元数据条目")
    
    # 检查必需的字段
    required_fields = ['file_name', 'additional_feature', 'image']
    for field in required_fields:
        if field not in metadata[0]:
            print(f"❌ 缺少必需字段: {field}")
            return False
        print(f"✓ 找到字段: {field}")
    
    # 检查图像文件是否存在
    missing_files = 0
    for entry in metadata[:5]:  # 只检查前5个文件
        image_path = os.path.join(data_dir, entry['file_name'])
        if not os.path.exists(image_path):
            missing_files += 1
    
    if missing_files > 0:
        print(f"❌ 发现 {missing_files} 个缺失的图像文件")
        return False
    
    print("✓ 图像文件检查通过")
    
    # 显示扰动分布
    perturbations = [entry['additional_feature'] for entry in metadata]
    perturbation_counts = pd.Series(perturbations).value_counts()
    print(f"\n扰动分布:")
    print(perturbation_counts)
    
    return True

# 验证数据集
is_valid = verify_dataset_format(TRAIN_DATA_DIR)
print(f"\n数据集格式验证: {'✓ 通过' if is_valid else '❌ 失败'}")

## 4. 扰动编码（Perturbation Encoding）

这一部分处理扰动编码，包括化学化合物和基因扰动的编码。

In [None]:
# 复制扰动编码器
perturbation_encoder_path = os.path.join(MORPHODIFF_ROOT, "perturbation_encoder.py")
local_perturbation_encoder = os.path.join(DATA_ROOT, "perturbation_encoder.py")

# 复制文件到工作目录
shutil.copy2(perturbation_encoder_path, local_perturbation_encoder)

# 复制required_file目录
required_files_src = os.path.join(MORPHODIFF_ROOT, "required_file")
required_files_dst = os.path.join(DATA_ROOT, "required_file")

if os.path.exists(required_files_src):
    shutil.copytree(required_files_src, required_files_dst, dirs_exist_ok=True)
    print("✓ 复制了扰动编码文件")
else:
    print("❌ 找不到required_file目录")

# 将工作目录添加到Python路径
sys.path.insert(0, DATA_ROOT)

In [None]:
# 创建示例扰动编码文件
def create_sample_perturbation_embeddings():
    """创建示例扰动编码文件"""
    
    # 示例化合物
    compounds = [
        "cytochalasin-d", "docetaxel", "epothilone-b", 
        "camptothecin", "taxol", "leupeptin"
    ]
    
    # 生成随机嵌入向量（实际使用中应该使用RDKit等工具生成真实的化学特征）
    embedding_dim = 200  # 嵌入维度
    
    # 创建DataFrame
    data = {"compound": compounds}
    
    # 添加潜在变量列
    for i in range(1, embedding_dim + 1):
        # 生成随机嵌入（实际应该是有意义的化学特征）
        data[f"latent_{i}"] = np.random.randn(len(compounds))
    
    df = pd.DataFrame(data)
    
    # 保存文件
    embedding_file = os.path.join(required_files_dst, "perturbation_embedding_bbbc021.csv")
    df.to_csv(embedding_file, index=False)
    
    print(f"✓ 创建了扰动编码文件: {embedding_file}")
    print(f"包含 {len(compounds)} 个化合物，每个有 {embedding_dim} 维特征")
    
    return df

# 创建示例编码
embedding_df = create_sample_perturbation_embeddings()
print("\n前几行编码数据:")
print(embedding_df.iloc[:3, :6])  # 显示前3行和前6列

In [None]:
# 创建扰动列表文件（用于推理）
def create_perturbation_list():
    """创建扰动列表文件用于图像生成"""
    
    compounds = [
        "cytochalasin-d", "docetaxel", "epothilone-b", 
        "camptothecin", "taxol", "leupeptin"
    ]
    
    # 创建扰动信息DataFrame
    perturbation_data = {
        "perturbation": compounds,
        "ood": [False] * len(compounds)  # 假设都不是out-of-distribution
    }
    
    df = pd.DataFrame(perturbation_data)
    
    # 保存文件
    pert_list_file = os.path.join(required_files_dst, f"{DATASET_NAME}_pert_ood_info.csv")
    df.to_csv(pert_list_file, index=False)
    
    print(f"✓ 创建了扰动列表文件: {pert_list_file}")
    print(df)
    
    return pert_list_file

# 创建扰动列表
perturbation_list_file = create_perturbation_list()

## 5. 模型训练

这一部分演示如何配置和启动MorphoDiff的训练过程。

In [None]:
# 创建训练配置
def create_training_config():
    """创建训练配置文件"""
    
    config = {
        # 模型配置
        "pretrained_model_name_or_path": PRETRAINED_MODEL,
        "naive_conditional": "conditional",  # MorphoDiff使用条件生成
        
        # 数据配置
        "train_data_dir": TRAIN_DATA_DIR,
        "dataset_id": DATASET_NAME,
        "image_column": "image",
        "caption_column": "additional_feature",
        
        # 训练配置
        "resolution": TRAINING_CONFIG["resolution"],
        "train_batch_size": TRAINING_CONFIG["train_batch_size"],
        "gradient_accumulation_steps": TRAINING_CONFIG["gradient_accumulation_steps"],
        "max_train_steps": TRAINING_CONFIG["max_train_steps"],
        "learning_rate": TRAINING_CONFIG["learning_rate"],
        "lr_scheduler": TRAINING_CONFIG["lr_scheduler"],
        "lr_warmup_steps": TRAINING_CONFIG["lr_warmup_steps"],
        
        # 验证和保存
        "validation_epochs": TRAINING_CONFIG["validation_epochs"],
        "validation_prompts": ",".join(VALIDATION_PROMPTS),
        "checkpointing_steps": TRAINING_CONFIG["checkpointing_steps"],
        "output_dir": OUTPUT_DIR,
        
        # 其他配置
        "enable_xformers_memory_efficient_attention": True,
        "random_flip": True,
        "use_ema": True,
        "gradient_checkpointing": True,
        "mixed_precision": "fp16",
        "seed": TRAINING_CONFIG["seed"],
        "logging_dir": LOG_DIR,
        "report_to": None,  # 不使用wandb以简化演示
    }
    
    return config

training_config = create_training_config()
print("训练配置:")
for key, value in training_config.items():
    print(f"  {key}: {value}")

In [None]:
# 创建训练脚本调用函数
def run_training(config, dry_run=True):
    """运行训练脚本"""
    
    # 构建训练命令
    train_script = os.path.join(MORPHODIFF_ROOT, "train.py")
    
    cmd = [
        "accelerate", "launch", 
        "--mixed_precision=fp16",
        train_script
    ]
    
    # 添加所有配置参数
    for key, value in config.items():
        if isinstance(value, bool):
            if value:
                cmd.append(f"--{key}")
        else:
            cmd.extend([f"--{key}", str(value)])
    
    print("训练命令:")
    print(" ".join(cmd))
    
    if dry_run:
        print("\n这是一个演示运行（dry_run=True）。要实际运行训练，请设置 dry_run=False")
        print("注意：实际训练需要大量的计算资源和时间")
        return None
    else:
        print("\n开始训练...")
        # 实际运行训练
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, cwd=DATA_ROOT)
            if result.returncode == 0:
                print("训练完成！")
                print(result.stdout)
            else:
                print("训练出错：")
                print(result.stderr)
            return result
        except Exception as e:
            print(f"运行训练时出错: {e}")
            return None

# 演示训练调用（不实际运行）
result = run_training(training_config, dry_run=True)

## 6. 模型推理和图像生成

这一部分演示如何使用训练好的模型生成图像。在实际使用中，您需要有一个训练好的检查点。

In [None]:
# 创建图像生成配置
def create_generation_config():
    """创建图像生成配置"""
    
    config = {
        "experiment": DATASET_NAME,
        "model_checkpoint": OUTPUT_DIR,  # 训练后的检查点路径
        "model_name": "SD",  # 固定为SD
        "model_type": "conditional",  # MorphoDiff使用条件生成
        "vae_path": OUTPUT_DIR,  # VAE路径
        "perturbation_list_address": perturbation_list_file,
        "gen_img_path": GENERATED_IMAGES_DIR,
        "num_imgs": 5,  # 每个扰动生成5张图像
        "ood": False
    }
    
    return config

generation_config = create_generation_config()
print("图像生成配置:")
for key, value in generation_config.items():
    print(f"  {key}: {value}")

In [None]:
# 演示图像生成过程（使用预训练的Stable Diffusion）
def demo_image_generation():
    """演示图像生成过程"""
    
    print("演示图像生成过程...")
    print("注意：这个演示使用预训练的Stable Diffusion模型")
    print("在实际使用中，您需要使用训练好的MorphoDiff检查点")
    
    try:
        # 加载预训练的Stable Diffusion管道
        pipe = StableDiffusionPipeline.from_pretrained(
            "CompVis/stable-diffusion-v1-4",
            torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
        )
        
        if torch.cuda.is_available():
            pipe = pipe.to("cuda")
        
        # 生成示例图像
        sample_prompts = [
            "cell morphology with cytochalasin treatment",
            "cell morphology with docetaxel treatment",
            "cell morphology with epothilone treatment"
        ]
        
        generated_images = []
        
        for i, prompt in enumerate(sample_prompts):
            print(f"生成图像 {i+1}/{len(sample_prompts)}: {prompt}")
            
            # 生成图像
            with torch.autocast("cuda" if torch.cuda.is_available() else "cpu"):
                image = pipe(prompt, num_inference_steps=20, guidance_scale=7.5).images[0]
            
            # 保存图像
            image_path = os.path.join(GENERATED_IMAGES_DIR, f"demo_{i:03d}.png")
            image.save(image_path)
            generated_images.append(image)
            
            print(f"✓ 保存图像: {image_path}")
        
        # 显示生成的图像
        fig, axes = plt.subplots(1, len(generated_images), figsize=(15, 5))
        if len(generated_images) == 1:
            axes = [axes]
        
        for i, (image, prompt) in enumerate(zip(generated_images, sample_prompts)):
            axes[i].imshow(image)
            axes[i].set_title(prompt[:30] + "...", fontsize=10)
            axes[i].axis('off')
        
        plt.tight_layout()
        plt.show()
        
        print("\n✓ 图像生成演示完成！")
        
        # 清理内存
        del pipe
        torch.cuda.empty_cache() if torch.cuda.is_available() else None
        
    except Exception as e:
        print(f"图像生成演示出错: {e}")
        print("可能是由于内存不足或模型下载问题")

# 运行图像生成演示
demo_image_generation()

In [None]:
# 创建图像生成脚本调用函数
def run_image_generation(config, dry_run=True):
    """运行图像生成脚本"""
    
    # 构建生成命令
    generation_script = os.path.join(MORPHODIFF_ROOT, "evaluation", "generate_img.py")
    
    cmd = ["python", generation_script]
    
    # 添加所有配置参数
    for key, value in config.items():
        cmd.extend([f"--{key}", str(value)])
    
    print("图像生成命令:")
    print(" ".join(cmd))
    
    if dry_run:
        print("\n这是一个演示运行（dry_run=True）。要实际运行图像生成，请设置 dry_run=False")
        print("注意：实际图像生成需要训练好的MorphoDiff检查点")
        return None
    else:
        print("\n开始生成图像...")
        # 实际运行图像生成
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, cwd=DATA_ROOT)
            if result.returncode == 0:
                print("图像生成完成！")
                print(result.stdout)
            else:
                print("图像生成出错：")
                print(result.stderr)
            return result
        except Exception as e:
            print(f"运行图像生成时出错: {e}")
            return None

# 演示图像生成调用（不实际运行）
result = run_image_generation(generation_config, dry_run=True)

## 7. 结果评估

这一部分演示如何评估生成的图像质量。

In [None]:
# 图像质量评估函数
def evaluate_generated_images(image_dir):
    """评估生成的图像"""
    
    print(f"评估目录中的图像: {image_dir}")
    
    # 查找所有图像文件
    image_files = []
    for ext in ['*.png', '*.jpg', '*.jpeg']:
        image_files.extend(Path(image_dir).glob(ext))
    
    if not image_files:
        print("未找到图像文件")
        return
    
    print(f"找到 {len(image_files)} 个图像文件")
    
    # 基础统计
    image_stats = []
    
    for image_path in image_files[:6]:  # 只处理前6个图像
        try:
            image = Image.open(image_path)
            image_array = np.array(image)
            
            stats = {
                'filename': image_path.name,
                'size': image.size,
                'mode': image.mode,
                'mean_intensity': np.mean(image_array),
                'std_intensity': np.std(image_array),
                'min_intensity': np.min(image_array),
                'max_intensity': np.max(image_array)
            }
            image_stats.append(stats)
            
        except Exception as e:
            print(f"处理图像 {image_path} 时出错: {e}")
    
    # 显示统计信息
    if image_stats:
        df_stats = pd.DataFrame(image_stats)
        print("\n图像统计信息:")
        print(df_stats)
        
        # 可视化图像
        fig, axes = plt.subplots(2, 3, figsize=(15, 10))
        axes = axes.flatten()
        
        for i, image_path in enumerate(image_files[:6]):
            try:
                image = Image.open(image_path)
                axes[i].imshow(image)
                axes[i].set_title(image_path.name, fontsize=10)
                axes[i].axis('off')
            except Exception as e:
                axes[i].text(0.5, 0.5, f'Error: {e}', ha='center', va='center')
                axes[i].set_title(image_path.name, fontsize=10)
        
        plt.tight_layout()
        plt.show()
    
    return image_stats

# 评估生成的图像
evaluation_results = evaluate_generated_images(GENERATED_IMAGES_DIR)

## 8. 完整流程总结

这个笔记本演示了MorphoDiff的完整流程：

In [None]:
# 流程总结
def summarize_pipeline():
    """总结整个流程"""
    
    summary = {
        "1. 环境设置": "✓ 完成",
        "2. 数据预处理": "✓ 完成 - 创建了示例数据集",
        "3. 扰动编码": "✓ 完成 - 创建了示例编码文件",
        "4. 模型训练": "演示模式 - 显示了训练命令",
        "5. 图像生成": "✓ 完成 - 使用预训练模型演示",
        "6. 结果评估": "✓ 完成 - 基础图像统计"
    }
    
    print("=" * 50)
    print("MorphoDiff 流程总结")
    print("=" * 50)
    
    for step, status in summary.items():
        print(f"{step}: {status}")
    
    print("\n" + "=" * 50)
    print("下一步操作:")
    print("=" * 50)
    
    next_steps = [
        "1. 准备真实的细胞图像数据集 (BBBC021, RxRx1, 等)",
        "2. 使用RDKit或scGPT生成真实的扰动编码",
        "3. 配置accelerate（运行 'accelerate config'）",
        "4. 运行完整训练（设置 dry_run=False）",
        "5. 使用训练好的检查点生成图像",
        "6. 使用CellProfiler进行下游分析"
    ]
    
    for step in next_steps:
        print(step)
    
    print("\n" + "=" * 50)
    print("重要文件位置:")
    print("=" * 50)
    
    important_files = {
        "项目根目录": PROJECT_ROOT,
        "训练数据": TRAIN_DATA_DIR,
        "扰动编码": os.path.join(required_files_dst, "perturbation_embedding_bbbc021.csv"),
        "扰动列表": perturbation_list_file,
        "模型输出": OUTPUT_DIR,
        "生成图像": GENERATED_IMAGES_DIR,
        "日志目录": LOG_DIR
    }
    
    for name, path in important_files.items():
        exists = "✓" if os.path.exists(path) else "❌"
        print(f"{name}: {path} {exists}")

summarize_pipeline()

## 9. 实际使用指南

要在实际项目中使用这个流程：

### 数据准备
1. 下载真实数据集（BBBC021、RxRx1等）
2. 按照ImageFolder格式组织数据
3. 创建metadata.jsonl文件

### 扰动编码
1. 对于化学化合物：使用RDKit生成分子描述符
2. 对于基因扰动：使用scGPT生成基因嵌入
3. 将编码保存为CSV格式

### 训练
1. 配置accelerate：`accelerate config`
2. 调整训练参数（批大小、学习率等）
3. 运行训练脚本

### 推理
1. 使用训练好的检查点
2. 准备扰动列表
3. 生成图像

### 评估
1. 使用CellProfiler提取特征
2. 计算图像质量指标
3. 进行下游分析