In [2]:
#!/usr/bin/env python3  # 指定脚本由 python3 解释运行
import os  # 导入 os 模块以便进行文件和路径操作
import sys  # 导入 sys 模块以便处理系统相关操作
import math  # 导入 math 模块以便使用数学函数
import numpy as np  # 导入 numpy 并别名为 np 用于数值计算
import anndata  # 导入 anndata 用于读取和处理 .h5ad 文件
import scanpy as sc  # 导入 scanpy 用于 scRNA-seq 常用预处理
import scipy.sparse as sp  # 导入 scipy.sparse 以识别稀疏矩阵类型
import torch  # 导入 PyTorch 以便构建与训练模型
import torch.nn as nn  # 导入神经网络模块并命名为 nn
import torch.nn.functional as F  # 导入函数式接口并命名为 F
from torch.utils.data import Dataset, DataLoader  # 从 PyTorch 导入 Dataset 与 DataLoader
from sklearn.mixture import GaussianMixture  # 导入高斯混合模型用于聚类
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score  # 导入 ARI 与 NMI 评估函数
import matplotlib.pyplot as plt  # 导入 matplotlib 用于绘图
from matplotlib import cm  # 导入 matplotlib 的 colormap 工具

  import pynvml  # type: ignore[import]


In [3]:
# device 选择：优先 CUDA:3，其次 MPS，否则 CPU
device = torch.device(  # 定义要使用的设备
    "cuda:3" if torch.cuda.is_available() else  # 如果 CUDA 可用则选择 cuda:3
    "mps" if torch.backends.mps.is_available() else  # 否则如果 MPS 可用则选择 mps
    "cpu"  # 否则退回到 CPU
)  # 结束 device 定义
print(f"✅ 模型训练使用设备：{device}")  # 打印将要使用的设备以便确认

✅ 模型训练使用设备：cuda:3


In [4]:
file_path = '/share/2025_undergraduate/20251106/setty_bone_marrow.h5ad'  # 指定输入 h5ad 文件路径
adata = anndata.read_h5ad(file_path)                          # 读取 h5ad 文件为 AnnData 对象
adata.var_names_make_unique()  # 确保基因名唯一以避免下标混淆
adata.obs_names_make_unique()  # 确保细胞名唯一以避免下标混淆
adata  # 显示 adata（Jupyter 中会展示对象摘要）

AnnData object with n_obs × n_vars = 5780 × 27876
    obs: 'clusters', 'palantir_pseudotime', 'palantir_diff_potential'
    var: 'palantir'
    uns: 'clusters_colors', 'palantir_branch_probs_cell_types'
    obsm: 'MAGIC_imputed_data', 'X_tsne', 'palantir_branch_probs'
    layers: 'spliced', 'unspliced'

In [5]:
# QC 基因注释：线粒体基因（人用 "MT-"，鼠用 "Mt-"）
adata.var["mt"] = adata.var_names.str.startswith("MT-")  # 标记以 MT- 开头的基因为线粒体基因
# 核糖体基因注释
adata.var["ribo"] = adata.var_names.str.startswith(("RPS", "RPL"))  # 标记以 RPS 或 RPL 开头的基因为核糖体基因
# 血红蛋白基因注释
adata.var["hb"] = adata.var_names.str.contains("^HB[^(P)]")  # 标记血红蛋白类基因（粗略匹配）

In [6]:
# 计算 QC 指标（会将结果写入 adata.obs），并对计数做 log1p（这里仅用于 QC 统计）
sc.pp.calculate_qc_metrics(adata, qc_vars=["mt", "ribo", "hb"], inplace=True, log1p=True)  # 计算 QC 指标并写入 adata.obs
# 过滤低质量细胞：至少 100 个基因
sc.pp.filter_cells(adata, min_genes=100)  # 过滤掉基因数少于 min_genes 的细胞
# 过滤低表达基因：至少在 3 个细胞中表达
sc.pp.filter_genes(adata, min_cells=3)  # 过滤掉在少于 min_cells 个细胞中表达的基因
# 使用 Scrublet 预测并过滤双细胞（注意：scanpy.scrublet 需要安装 scrublet 包并支持）
try:
    sc.pp.scrublet(adata)   # 尝试运行 Scrublet 双细胞检测（若 scanpy 版本支持）
except Exception:  # 如果 scrublet 不可用或调用失败
    pass  # 忽略 Scrublet 步骤（继续运行）

# 归一化到中位数总 counts（临时用于 HVG 检测）
sc.pp.normalize_total(adata)  # 将每个细胞的总 counts 缩放到同一水平（默认 target_sum=1e4）
# 对数化数据以便下游统计稳定
sc.pp.log1p(adata)  # 对数据执行 log1p 变换
# 计算高变基因（HVG），默认会在 adata.var['highly_variable'] 中标记
sc.pp.highly_variable_genes(adata)  # 识别高度可变基因
# 仅保留 HVG 子集以便后续分析（注意：训练 ZINB-VAE 时我们会使用原始计数矩阵的这些基因）
adata = adata[:,adata.var['highly_variable']==True]  # 根据 HVG 掩码对 adata 进行基因子集化

In [7]:
def set_seed(seed: int):  # 设置随机种子以提高可重复性
    np.random.seed(seed)  # 设置 numpy 随机种子
    torch.manual_seed(seed)  # 设置 PyTorch CPU 随机种子
    try:
        torch.cuda.manual_seed_all(seed)  # 如果有 CUDA 则设置所有 GPU 随机种子
    except Exception:
        pass  # 若设置失败则忽略

In [8]:
def sparse_to_dense_array(X):  # 将 scipy 稀疏矩阵转为 numpy 密集数组
    if sp.issparse(X):  # 如果输入为稀疏矩阵
        X = X.toarray()  # 转为密集数组
    return np.asarray(X)  # 返回 numpy ndarray

In [9]:
class CountsDataset(Dataset):  # 自定义 PyTorch Dataset，用于按批次提供原始计数与 size_factors
    def __init__(self, counts: np.ndarray, size_factors: np.ndarray):  # 构造函数，接收计数矩阵与对应的 size_factors
        assert counts.shape[0] == size_factors.shape[0]  # 确保细胞数一致
        self.counts = counts.astype(np.float32)  # 存储计数并转换为 float32
        self.size_factors = size_factors.astype(np.float32)  # 存储 size_factors 并转换为 float32
    def __len__(self):  # 返回样本数量（细胞数）
        return self.counts.shape[0]  # 返回计数矩阵的行数
    def __getitem__(self, idx):  # 返回第 idx 个样本（计数向量, size_factor）
        x = self.counts[idx]  # 获取第 idx 个细胞的计数向量
        sf = self.size_factors[idx]  # 获取对应的 size_factor
        return x, sf  # 返回计数和 size_factor

In [10]:
def zinb_negative_log_likelihood(x, mu, theta, pi_logits, eps=1e-8):  # 完整且数值稳定的 ZINB 负对数似然实现
    if theta.ndim == 1:  # 如果 theta 是 1D（每基因一个参数）
        theta = theta.unsqueeze(0).expand_as(mu)  # 扩展为与 mu 相同形状以便逐元素计算
    theta = torch.clamp(theta, min=eps)  # 限制 theta 最小值避免数值问题
    mu = torch.clamp(mu, min=eps)  # 限制 mu 最小值避免 log(0)
    # NB log pmf 计算（准确稳定的形式）
    lg1 = torch.lgamma(theta + x) - torch.lgamma(theta) - torch.lgamma(x + 1.0)  # Gamma 函数组合项
    lg2 = theta * (torch.log(theta + eps) - torch.log(mu + theta + eps)) + x * (torch.log(mu + eps) - torch.log(mu + theta + eps))  # 与 mu, theta 相关的对数项
    log_nb = lg1 + lg2  # 负二项分布的对数概率
    pi = torch.sigmoid(pi_logits)  # 将 logits 转换为零膨胀概率 pi
    # 处理零事件：log(pi + (1-pi) * exp(log_nb_zero)) 的稳定计算
    log_pi = torch.log(torch.clamp(pi, min=eps))  # log(pi)
    log_1_minus_pi = torch.log(torch.clamp(1 - pi, min=eps))  # log(1-pi)
    log_nb_zero = log_nb  # 当 x == 0 时 log_nb 即为 NB 在 0 的对数概率
    a = log_pi  # a = log(pi)
    b = log_1_minus_pi + log_nb_zero  # b = log(1-pi) + log_nb_zero
    max_ab = torch.maximum(a, b)  # 取最大值用于数值稳定的 log-sum-exp
    log_zinb_zero = max_ab + torch.log(torch.exp(a - max_ab) + torch.exp(b - max_ab) + eps)  # 稳定计算 log(pi + (1-pi)*exp(log_nb_zero))
    # 非零计数时的 log 概率
    log_zinb_nonzero = log_1_minus_pi + log_nb  # log((1-pi) * p_nb(x))
    # 选择零或非零的 log 概率
    log_prob = torch.where(x < 0.5, log_zinb_zero, log_zinb_nonzero)  # 对于 x < 0.5 视为 0
    nll = -log_prob  # 负对数似然
    return nll.sum(dim=1).mean()  # 对基因求和再对批次求平均并返回

In [29]:
class ZINBVAE(nn.Module):  # 完整的 ZINB-VAE 模型实现（编码器->潜变量->解码器输出 ZINB 参数）
    def __init__(self, n_genes: int, n_latent: int = 10, hidden_dims=(256,128), theta_init=1.0, dropout=0.2):  # 构造函数
        super().__init__()  # 调用父类构造函数
        self.n_genes = n_genes  # 保存基因数
        self.n_latent = n_latent  # 保存潜变量维度
        # 构建编码器 MLP
        enc_layers = []  # 存放编码器层的列表————
        input_dim = n_genes  # 编码器输入维度为基因数
        for h in hidden_dims:  # 以 hidden_dims 构建多层编码器
            enc_layers.append(nn.Linear(input_dim, h))  # 添加线性层
            enc_layers.append(nn.ReLU())  # 添加 ReLU 激活
            enc_layers.append(nn.Dropout(dropout))  # 添加 Dropout 做正则化
            input_dim = h  # 更新下一层输入维度
        self.encoder = nn.Sequential(*enc_layers)  # 将编码器层封装为 Sequential
        self.enc_mu = nn.Linear(input_dim, n_latent)  # 编码器输出 z 的均值
        self.enc_logvar = nn.Linear(input_dim, n_latent)  # 编码器输出 z 的对数方差
        # 构建解码器 MLP（对称结构）
        dec_layers = []  # 存放解码器隐藏层
        input_dim = n_latent  # 解码器输入维度为潜变量维度
        for h in reversed(hidden_dims):  # 逆序构建隐藏层以对称编码器
            dec_layers.append(nn.Linear(input_dim, h))  # 添加线性层
            dec_layers.append(nn.ReLU())  # 添加 ReLU 激活
            dec_layers.append(nn.Dropout(dropout))  # 添加 Dropout
            input_dim = h  # 更新下一层输入维度
        self.decoder_hidden = nn.Sequential(*dec_layers)  # 封装解码器隐藏层
        self.dec_mu = nn.Linear(input_dim, n_genes)  # 解码器输出 log_mu 的线性头（未结合 size_factors）
        self.dec_pi = nn.Linear(input_dim, n_genes)  # 解码器输出 pi logits 的线性头
        # 可学习的每基因 dispersion 参数 theta（初始化为 theta_init）
        self.theta = nn.Parameter(torch.ones(n_genes) * theta_init)  # 参数 theta，可学习
        
    def encode(self, x):  # 编码函数，x 已按 log1p(counts/size_factors) 变换
        h = self.encoder(x)  # 通过编码器获得隐层表示 h
        return self.enc_mu(h), self.enc_logvar(h)  # 返回 z 的均值与对数方差
    
    def reparameterize(self, mu, logvar):  # 重参数化采样函数
        std = torch.exp(0.5 * logvar)  # 计算标准差
        eps = torch.randn_like(std)  # 生成正态噪声
        return mu + eps * std  # 返回采样后的 z
    
    def decode(self, z, size_factors):  # 解码函数，返回 pi_logits、mu、theta
        h = self.decoder_hidden(z)  # 通过解码器隐藏层得到 h
        pi_logits = self.dec_pi(h)  # 计算 pi 的 logits（batch, genes）
        log_mu = self.dec_mu(h)  # 计算基因的 log_mu（batch, genes）
        sf_log = torch.log(torch.clamp(size_factors, min=1e-8)).unsqueeze(1)  # 计算 size_factors 的对数并扩展为 (batch,1)
        mu = torch.exp(log_mu + sf_log)  # 将 log_mu 与 size_factors 对数相加后 exponentiate 得到最终的 mu
        theta = F.softplus(self.theta) + 1e-4  # 用 softplus 确保 theta 为正并加上微小常数避免数值问题
        return pi_logits, mu, theta  # 返回解码得到的 ZINB 参数
    
    def forward(self, x, size_factors):  # 前向函数：计算 NLL、KL 并返回 z_mu
        sf = size_factors.unsqueeze(1)  # 扩展 size_factors 到 (batch,1)
        x_enc = torch.log1p(x / (sf + 1e-8))  # 使用 log1p(counts/size_factors) 作为编码器的输入变换
        z_mu, z_logvar = self.encode(x_enc)  # 得到 z 的均值与对数方差
        z = self.reparameterize(z_mu, z_logvar)  # 重参数化采样 z
        pi_logits, mu, theta = self.decode(z, size_factors)  # 解码得到 ZINB 参数
        nll = zinb_negative_log_likelihood(x, mu, theta, pi_logits)  # 计算 ZINB 的负对数似然
        kl = -0.5 * torch.sum(1 + z_logvar - z_mu.pow(2) - z_logvar.exp(), dim=1).mean()  # 计算 q(z|x) 与标准正态的 KL 并对批次均值化
        return nll, kl, z_mu  # 返回 NLL、KL、z 的均值表示

In [30]:
def train(model: ZINBVAE, dataloader: DataLoader, optimizer, device, n_epochs=200, kl_weight=1.0, verbose=True):  # 完整训练循环
    model.to(device)  # 将模型移动到指定设备
    for epoch in range(1, n_epochs + 1):  # 迭代每个 epoch
        model.train()  # 切换到训练模式（启用 dropout）
        epoch_nll = 0.0  # 累计 NLL
        epoch_kl = 0.0  # 累计 KL
        batch_count = 0  # 批次数计数器
        for x_batch, sf_batch in dataloader:  # 遍历数据加载器
            x_batch = torch.tensor(x_batch, device=device) if not isinstance(x_batch, torch.Tensor) else x_batch.to(device)  # 将批次计数送入设备
            sf_batch = torch.tensor(sf_batch, device=device) if not isinstance(sf_batch, torch.Tensor) else sf_batch.to(device)  # 将 size_factors 送入设备
            optimizer.zero_grad()  # 清零梯度
            nll, kl, _ = model(x_batch, sf_batch)  # 前向计算得到 NLL 与 KL
            loss = nll + kl_weight * kl  # 总损失 = NLL + kl_weight * KL
            loss.backward()  # 反向传播计算梯度
            optimizer.step()  # 更新参数
            epoch_nll += nll.item()  # 累加 NLL
            epoch_kl += kl.item()  # 累加 KL
            batch_count += 1  # 增加批次计数
        if verbose and (epoch % 10 == 0 or epoch == 1):  # 每 10 个 epoch 或第 1 个 epoch 打印一次信息
            print(f"Epoch {epoch}/{n_epochs}, avg_nll={epoch_nll/batch_count:.4f}, avg_kl={epoch_kl/batch_count:.4f}")  # 打印平均 NLL 与 KL
    return model  # 返回训练后的模型

In [13]:
def get_latent_embeddings(model: ZINBVAE, counts: np.ndarray, size_factors: np.ndarray, device, batch_size=256) -> np.ndarray:  # 计算所有细胞的 z_mu 嵌入
    model.to(device)  # 将模型移动到设备
    model.eval()  # 切换到评估模式禁用 dropout
    dataset = CountsDataset(counts, size_factors)  # 用 CountsDataset 包装数据
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)  # 创建推断用 DataLoader（不打乱）
    zs = []  # 用于收集 z_mu 的列表
    with torch.no_grad():  # 关闭梯度计算以节省显存
        for x_batch, sf_batch in dataloader:  # 遍历每个批次
            x_batch = torch.tensor(x_batch, device=device) if not isinstance(x_batch, torch.Tensor) else x_batch.to(device)  # 转为 tensor 并移动到设备
            sf_batch = torch.tensor(sf_batch, device=device) if not isinstance(sf_batch, torch.Tensor) else sf_batch.to(device)  # 转为 tensor 并移动到设备
            x_enc = torch.log1p(x_batch / (sf_batch.unsqueeze(1) + 1e-8))  # 计算 encoder 输入变换
            z_mu, _ = model.encode(x_enc)  # 通过编码器获取 z 的均值
            zs.append(z_mu.cpu().numpy())  # 将结果移回 CPU 并转为 numpy 后追加
    zs = np.vstack(zs)  # 将所有批次的嵌入垂直堆叠为数组
    return zs  # 返回嵌入矩阵

In [32]:
def run_full_from_prelude(adata_prelude, raw_h5ad_path, device, out_h5ad=None, latent_dim=10, batch_size=128, n_epochs=200, lr=1e-3, seed=42, n_top_genes=2000):  # 主流程函数签名
    set_seed(seed)  # 设置随机种子以保证可重复性
    # 1) 确保 prelude 中的 adata 是经过 HVG 筛选的子集（前置代码已完成该步）
    adata_hvg = adata_prelude  # 将前置 adata 视为 HVG 子集
    # 2) 从原始 h5ad 中提取这些 HVG 对应的原始计数（未归一化 / 未 log1p）
    raw_adata = anndata.read_h5ad(raw_h5ad_path)  # 读取原始 h5ad 以获取原始计数
    raw_adata.var_names_make_unique()  # 确保基因名唯一
    raw_adata.obs_names_make_unique()  # 确保细胞名唯一
    hvgs = list(adata_hvg.var_names)  # 获取 HVG 列表
    if set(hvgs).issubset(set(raw_adata.var_names)):  # 若 HVG 为原始数据子集
        hvgs_idx = [list(raw_adata.var_names).index(g) for g in hvgs]  # 在原始 var 中找到索引
        raw_counts = raw_adata.X[:, hvgs_idx]  # 提取 HVG 的原始计数矩阵
    else:
        common_genes = [g for g in hvgs if g in raw_adata.var_names]  # 计算交集基因
        if len(common_genes) == 0:  # 若无交集则报错
            raise ValueError("前置 adata 的基因与原始 h5ad 无重叠，请检查基因命名一致性")  # 提示用户检查基因名
        hvgs_idx = [list(raw_adata.var_names).index(g) for g in common_genes]  # 找到交集基因索引
        raw_counts = raw_adata.X[:, hvgs_idx]  # 提取对应计数
        adata_hvg = adata_hvg[:, common_genes]  # 更新 prelude adata 以对齐基因

    # 3) 确保 raw_counts 为密集矩阵并为 float32
    if sp.issparse(raw_counts):  # 如果原始计数是稀疏格式
        raw_counts = raw_counts.toarray()  # 转成密集数组
    raw_counts = np.asarray(raw_counts, dtype=np.float32)  # 转为 float32 ndarray
    if np.any(np.isnan(raw_counts)):  # 如果存在 NaN
        raise ValueError("原始计数中存在 NaN，请检查输入文件")  # 提示并中断
    if (raw_counts < 0).any():  # 如果存在负值
        raise ValueError("原始计数中存在负值，请检查输入文件")  # 提示并中断

    # 4) 计算 size_factors（library size），并归一化到几何平均为 1（常用做法）
    total_counts = raw_counts.sum(axis=1)  # 每个细胞的总计数
    total_counts[total_counts <= 0] = 1.0  # 避免零或负值导致除零
    size_factors = total_counts / np.exp(np.mean(np.log(total_counts + 1e-8)))  # 使用几何平均归一化得到 size_factors
    size_factors = size_factors.astype(np.float32)  # 转为 float32

    # 5) 构建 DataLoader
    dataset = CountsDataset(raw_counts, size_factors)  # 使用 CountsDataset 包装原始计数与 size_factors
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=False)  # 训练 DataLoader（打乱数据）

    # 6) 初始化完整 ZINB-VAE 并设置优化器
    n_genes = raw_counts.shape[1]  # 基因数
    model = ZINBVAE(n_genes=n_genes, n_latent=latent_dim, hidden_dims=(256,128), theta_init=1.0, dropout=0.2)  # 初始化模型
    model.to(device)  # 将模型移动到前置选择的设备
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)  # 使用 Adam 优化器

    # 7) 训练模型（注意：绝不可使用 adata_hvg.obs['cell_type'] 参与训练）
    model = train(model, dataloader, optimizer, device, n_epochs=n_epochs, kl_weight=1.0, verbose=True)  # 执行训练

    # 8) 推断全部细胞的潜在表示 z_mu 并写回 adata_hvg.obsm
    embeddings = get_latent_embeddings(model, raw_counts, size_factors, device, batch_size=256)  # 获取嵌入
    adata_hvg.obsm['X_zinbvae'] = embeddings  # 将嵌入写回 prelude 的 adata（HVG 子集）

    # 9) 聚类（可以使用真实标签的类别数来设置簇数，但真实标签不得用于训练）
  # 确认用于评估的真实标签列：优先使用 'cell_type'，其次尝试 'clusters'，再尝试常见备选项
    if 'cell_type' in adata_hvg.obs:
        label_key = 'cell_type'  # 优先使用 cell_type
    elif 'clusters' in adata_hvg.obs:
        label_key = 'clusters'  # 回退使用 clusters（你提示中 labels 在 obs 的 clusters 列）
    else:
        # 尝试一些常见的备选列名
        fallback_candidates = ['celltype', 'cell_type_pred', 'label', 'labels', 'true_labels']
        label_key = None
        for cand in fallback_candidates:
            if cand in adata_hvg.obs:
                label_key = cand
                break
        if label_key is None:
            # 若没有找到任何候选列，则给出明确错误提示并列出可用的 obs 列
            available = ", ".join(list(adata_hvg.obs.columns))
            raise KeyError(
                "未在 adata.obs 中找到用于评估的真实标签列（例如 'cell_type' 或 'clusters'）。"
                f" 请检查你的 AnnData，当前可用的 obs 列有: {available}"
            )
    # 使用选定的标签列来确定簇数和构建 true_labels
    true_labels = adata_hvg.obs[label_key].values.astype(str)  # 取出真实标签（字符串形式）
    n_clusters = int(adata_hvg.obs[label_key].nunique())  # 使用真实标签的唯一值数作为聚类数
    print(f"使用 adata.obs['{label_key}'] 作为真实标签用于评估与确定簇数。")  # 打印所用列名以便确认
    print(f"将使用 {n_clusters} 个簇对嵌入进行聚类（GaussianMixture）")  # 打印聚类信息
    gmm = GaussianMixture(n_components=n_clusters, covariance_type='full', random_state=112)  # 初始化 GMM
    pred_labels = gmm.fit_predict(embeddings)  # 对嵌入执行聚类并获得预测簇标签
    adata_hvg.obs['zinbvae_cluster'] = [str(int(x)) for x in pred_labels]  # 将预测簇标签存回 adata.obs（字符串形式）

    # 10) 评估：计算 ARI 与 NMI（使用真实标签，但仅用于评估）
    true_labels = adata_hvg.obs['clusters'].values.astype(str)  # 获取真实标签数组并转为字符串
    ari = adjusted_rand_score(true_labels, pred_labels)  # 计算调整后的兰德指数（ARI）
    nmi = normalized_mutual_info_score(true_labels, pred_labels, average_method='arithmetic')  # 计算归一化互信息（NMI）
    print("聚类评估结果（基于 adata.obs['cell_type']）：")  # 说明评估基准
    print(f"Adjusted Rand Index (ARI): {ari:.4f}")  # 打印 ARI
    print(f"Normalized Mutual Information (NMI): {nmi:.4f}")  # 打印 NMI
    
        # -------------------------
    # 可视化（加入的位置：就在打印 ARI / NMI 之后，在保存 adata 之前）
    # 说明：这里优先尝试使用 umap-learn；如果不可用则退回到 sklearn.manifold.TSNE
    # -------------------------
    try:
        import umap as _umap  # 尝试导入 umap-learn
        reducer = _umap.UMAP(n_components=2, random_state=42)  # 使用 UMAP 将高维嵌入降到 2 维
        coords_2d = reducer.fit_transform(embeddings)  # 计算 2D 投影
    except Exception:
        from sklearn.manifold import TSNE  # 若 umap 不可用则使用 t-SNE 作为后备
        reducer = TSNE(n_components=2, random_state=42, init='pca')  # 初始化 t-SNE
        coords_2d = reducer.fit_transform(embeddings)  # 计算 2D 投影

    # 存储 2D 投影到 adata.obsm，便于后续使用
    adata_hvg.obsm['X_umap'] = coords_2d  # 将 2D 坐标保存为 X_umap（命名为 umap 方便与 scanpy 接口兼容）

    # 绘制两个子图：左侧为预测簇标签，右侧为真实细胞类型
    fig, axes = plt.subplots(1, 2, figsize=(14, 6))  # 创建画布和两个子图
    # 颜色映射 - predicted clusters
    uniq_pred = np.unique(pred_labels)  # 预测簇的唯一值
    cmap_pred = cm.get_cmap('tab20', len(uniq_pred))  # 使用 tab20 colormap（或根据簇数量自动扩展）
    pred_color_idxs = np.array([np.where(uniq_pred == v)[0][0] for v in pred_labels])  # 将簇标签映射到颜色索引
    axes[0].scatter(coords_2d[:, 0], coords_2d[:, 1], c=cmap_pred(pred_color_idxs), s=6, linewidths=0)  # 绘制散点图
    axes[0].set_title('ZINB-VAE clusters (predicted)')  # 子图标题
    axes[0].axis('off')  # 关闭坐标轴以便更清晰展示

    # 颜色映射 - true labels
    uniq_true = np.unique(true_labels)  # 真实标签的唯一值
    cmap_true = cm.get_cmap('tab20', len(uniq_true))  # 使用 tab20 colormap
    true_color_idxs = np.array([np.where(uniq_true == v)[0][0] for v in true_labels])  # 将真实标签映射为颜色索引
    axes[1].scatter(coords_2d[:, 0], coords_2d[:, 1], c=cmap_true(true_color_idxs), s=6, linewidths=0)  # 绘制散点图
    axes[1].set_title('True cell types')  # 子图标题
    axes[1].axis('off')  # 关闭坐标轴

    plt.tight_layout()  # 自动调整子图间距
    # 保存可视化图片到与输入 h5ad 相同目录下
    vis_path = os.path.splitext(raw_h5ad_path)[0] + "_zinbvae_clusters_umap.png"  # 构造输出图片路径
    plt.savefig(vis_path, dpi=150, bbox_inches='tight')  # 保存图片文件
    plt.close(fig)  # 关闭 figure 以释放内存
    print(f"已保存聚类可视化图片到: {vis_path}")  # 打印可视化文件路径

    # 11) 可选：保存包含嵌入与聚类标签的 AnnData 文件
    if out_h5ad is None:  # 如果用户未指定输出路径
        out_h5ad = os.path.splitext(raw_h5ad_path)[0] + "_zinbvae_full_out.h5ad"  # 生成默认输出文件名
    adata_hvg.write(out_h5ad)  # 将包含嵌入与簇标签的 AnnData 写入磁盘
    print(f"已将包含嵌入与簇标签的 AnnData 保存到: {out_h5ad}")  # 打印保存路径
    return adata_hvg, model, ari, nmi  # 返回结果以便进一步分析或测试

In [35]:
if __name__ == "__main__":  # 如果脚本作为主程序运行则执行下面流程
    # 直接使用前置定义的 adata 与 file_path（前置代码已将它们加载）
    adata_result, trained_model, ari_value, nmi_value = run_full_from_prelude(
        adata_prelude=adata,  # 前置步骤处理后的 AnnData（HVG 子集）
        raw_h5ad_path=file_path,  # 原始 h5ad 文件路径以提取原始计数
        device=device,  # 使用你前置选择的设备
        out_h5ad=None,  # 使用默认输出文件名（可修改为你想要的路径）
        latent_dim=10,  # 潜变量维度（可根据需要调整）
        batch_size=128,  # 训练批次大小（可调整）
        n_epochs=500,  # 训练轮数（可调整）
        lr=1e-4,  # 学习率（可调整）
        seed = 42  # 随机种子以保证可重复性
    )  # 执行完整流程并返回结果（包括已保存的 adata）

Epoch 1/500, avg_nll=1152.6655, avg_kl=0.9922
Epoch 10/500, avg_nll=795.0070, avg_kl=4.0031
Epoch 20/500, avg_nll=777.5748, avg_kl=4.3845
Epoch 30/500, avg_nll=756.2423, avg_kl=5.9987
Epoch 40/500, avg_nll=748.7731, avg_kl=5.9784
Epoch 50/500, avg_nll=744.2017, avg_kl=6.1457
Epoch 60/500, avg_nll=739.9522, avg_kl=6.5028
Epoch 70/500, avg_nll=735.4031, avg_kl=6.5612
Epoch 80/500, avg_nll=733.8824, avg_kl=6.5912
Epoch 90/500, avg_nll=730.8128, avg_kl=6.5701
Epoch 100/500, avg_nll=731.4570, avg_kl=6.6804
Epoch 110/500, avg_nll=730.2116, avg_kl=6.6004
Epoch 120/500, avg_nll=728.1441, avg_kl=6.6634
Epoch 130/500, avg_nll=726.6465, avg_kl=6.6367
Epoch 140/500, avg_nll=726.9549, avg_kl=6.6911
Epoch 150/500, avg_nll=725.9713, avg_kl=6.6061
Epoch 160/500, avg_nll=725.5042, avg_kl=6.6588
Epoch 170/500, avg_nll=724.5407, avg_kl=6.6075
Epoch 180/500, avg_nll=723.3138, avg_kl=6.6369
Epoch 190/500, avg_nll=722.8658, avg_kl=6.6247
Epoch 200/500, avg_nll=721.8961, avg_kl=6.7770
Epoch 210/500, avg_nll=

  warn(
  cmap_pred = cm.get_cmap('tab20', len(uniq_pred))  # 使用 tab20 colormap（或根据簇数量自动扩展）
  cmap_true = cm.get_cmap('tab20', len(uniq_true))  # 使用 tab20 colormap


已保存聚类可视化图片到: /share/2025_undergraduate/20251106/setty_bone_marrow_zinbvae_clusters_umap.png
已将包含嵌入与簇标签的 AnnData 保存到: /share/2025_undergraduate/20251106/setty_bone_marrow_zinbvae_full_out.h5ad


In [None]:
adata.obsm['MAGIC_imputed_data']

array([[ 2.5806e-01,  1.4061e-02,  4.6783e-02, ..., -2.5238e-02,
         8.0273e-01, -1.2256e-01],
       [ 6.6101e-02, -1.6998e-02,  2.4014e-03, ..., -9.7227e-04,
        -4.3872e-01,  1.7419e-01],
       [-3.0908e-01, -1.1774e-01, -3.9795e-02, ..., -3.7292e-02,
        -9.7607e-01,  8.2947e-02],
       ...,
       [ 2.1936e-01,  1.9177e-01,  1.6266e-02, ..., -3.9077e-04,
         2.1399e-01, -4.4983e-02],
       [ 5.7861e-01,  2.1362e-01,  6.8726e-02, ..., -3.8300e-02,
         1.0107e+00, -3.6255e-01],
       [ 2.1790e-01,  1.6309e-01,  8.5983e-03, ...,  4.0863e-02,
         2.4246e-02,  1.6998e-02]], shape=(5780, 14319), dtype=float16)

In [19]:
-3.6255e-01

-0.36255

In [20]:
-3.6255e-02

-0.036255

In [22]:
-3.6255e-01

-0.36255