## A scripts to simulate and clustering single cell transcriptome.

## step1. simulating single cell transcriptome data.

In [None]:
import numpy as np
import pandas as pd
import torch
from scipy.stats import lognorm, poisson
from umap import UMAP
import matplotlib.pyplot as plt
import seaborn as sns

def generate_realistic_scRNAseq_data(n_cells=40000, n_genes=2000, n_clusters=5,n_housekeeping=20):
    # 参数设置
    cells_per_cluster = n_cells // n_clusters
    
    # 1. 创建基础表达矩阵（21000×2000，初始为0）
    data = np.zeros((n_cells, n_genes))
    labels = np.array([])
    # 2. 定义基因分组
    # np.random.seed(4211)
    # 20个所有群体共表达的基因
    common_genes = np.random.choice(n_genes, n_housekeeping, replace=False)
    
    # 每群特异表达的基因~50 genes
    cluster_specific = { 'cell%s'%(i): np.random.choice(list(set(range(n_genes)) - set(common_genes)), 50) for i in range(n_clusters)}
    # 3. 生成每群细胞的表达模式
    for i, cluster in enumerate(['cell%s'%(i) for i in range(n_clusters)]):
        start = i * cells_per_cluster
        end = (i+1) * cells_per_cluster
        labels = np.append(labels, [cluster] * cells_per_cluster)
        # 所有细胞都表达common genes
        data[start:end, common_genes] = lognorm.rvs(s=1, scale=10, size=(cells_per_cluster, n_housekeeping))
        
        # 群体特异基因
        spec_genes = cluster_specific[cluster]
        n_spec = len(spec_genes)
        for cell_idx in range(start, end):
            # 每个细胞随机表达50-200个基因
            expressed = np.random.choice(spec_genes, np.random.randint(15, n_spec), replace=False)
            
            # 表达值分布：大部分低表达，少数高表达
            low_expr = lognorm.rvs(s=1, scale=5, size=len(expressed)//2)
            high_expr = lognorm.rvs(s=1, scale=1e3, size=len(expressed)-len(low_expr))
            data[cell_idx, expressed] = np.concatenate([low_expr, high_expr])
            
            # 添加dropout效果（60%个基因为 gene do not express）
            zeros = np.random.choice(n_genes, int(n_genes*0.6), replace=False)
            data[cell_idx, zeros] = 0
    
    # 4. 添加技术噪声和归一化
    data = data * poisson.rvs(1, size=data.shape)  # 泊松噪声
    data = data / data.sum(axis=1, keepdims=True) * 1e6  # TPM归一化
    # 5. 创建DataFrame
    df = pd.DataFrame(data, columns=[f'Gene_{i}' for i in range(n_genes)])
    df['Cell_Label'] = labels
    
    return df

##visualization
def visualize_with_umap(X,Ylabel,savepath='/XYFS01/HDD_POOL/sysu_mhou/sysu_mhou_1/deep_learning_book/practices/decoder_only_SingleCellTranscriptome/example/figs',figname='Simu_cell_type_umap.png'):
    # UMAP降维
    reducer = UMAP(n_components=2,n_jobs=4)# random seed would not allow parallelization
    embedding = reducer.fit_transform(np.log1p(X))  # log1p转换使分布更接近正态
    
    # 可视化
    unique_labels = np.unique(Ylabel)
    palette_lst = sns.color_palette("tab20", n_colors=len(unique_labels))

    plt.figure(figsize=(10, 8))
    sns.scatterplot(
        x=embedding[:, 0], y=embedding[:, 1],
        hue=Ylabel, palette=palette_lst,hue_order=unique_labels,
        alpha=0.6, s=10
    )
    plt.title('UMAP Visualization of Synthetic scRNA-seq Data',fontsize=20)
    plt.xlabel('UMAP1', fontsize=18)
    plt.ylabel('UMAP2', fontsize=18)
    plt.legend(title='Cell Type',title_fontsize=16, fontsize=16, loc='lower right')
    # plt.savefig(f"{savepath}/{figname}")
    plt.show()



In [None]:
if __name__ == "__main__":
    # 生成数据
    sc_data = generate_realistic_scRNAseq_data(n_clusters=5,n_housekeeping=40)
    sc_data.to_csv('/XYFS01/HDD_POOL/sysu_mhou/sysu_mhou_1/deep_learning_book/practices/decoder_only_SingleCellTranscriptome/example/scRNAseq_SimuData.csv', index=False)
    visualize_with_umap(X=sc_data.drop(columns=['Cell_Label']).values,Ylabel=sc_data['Cell_Label'])

## step2. training a transformer (decoder-only) model to learn cell representation.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SingleCellDecoder(nn.Module):
    def __init__(
        self,
        input_dim: int = 20000,  # 单细胞数据的基因数（输入维度）
        hidden_dim: int = 512,   # 隐藏层维度
        num_layers: int = 2,     # Transformer层数
        nhead: int = 4,          # 多头注意力头数
        dropout: float = 0.1,    # Dropout率
    ):
        super().__init__()
        
        # 输入嵌入层（基因表达 → 隐藏层）
        self.embedding = nn.Linear(input_dim, hidden_dim)
        
        # Transformer Decoder
        decoder_layer = nn.TransformerDecoderLayer(
            d_model=hidden_dim,
            nhead=nhead,
            dropout=dropout,
            batch_first=True,#batch_first=True 的功能是让输入张量的形状以 (batch_size, 序列长度, 特征维度) 排列，而不是默认的 (序列长度, batch_size, 特征维度)
        )
        self.transformer_decoder = nn.TransformerDecoder(
            decoder_layer,
            num_layers=num_layers,
        )
        
        # 输出层（隐藏层 → 基因表达重建）
        self.output_layer = nn.Linear(hidden_dim, input_dim)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        输入: 
            x: (batch_size, seq_len, input_dim)  # 单细胞基因表达矩阵
        输出: 
            (batch_size, seq_len, input_dim)     # 重建的基因表达
        """
        # 1. 嵌入层
        x_embed = self.embedding(x)  # (batch_size, seq_len, hidden_dim)
        
        # 2. 自注意力解码（无交叉注意力）
        memory = torch.zeros_like(x_embed)  # 无encoder输入，仅自回归
        output = self.transformer_decoder(
            tgt=x_embed,## 目标序列（当前要生成的序列）
            memory=memory,## 无Encoder，用全零占位
        )
        
        # 3. 输出重建
        return self.output_layer(output)



In [None]:
import torch
import torch.nn as nn
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset

# 1. 准备数据（假设sc_data是DataFrame，形状为(21000, genes+1)，最后一列是cell_type）
X = sc_data.iloc[:, :-1].values  # 基因表达数据 (21000, n_genes)
y = sc_data.iloc[:, -1].values    # 细胞类型标签
n_genes = X.shape[1]

# 转换为PyTorch张量并添加序列维度 (21000, 1, n_genes)
X_tensor = torch.FloatTensor(X).unsqueeze(1)

# 2. 划分训练集和测试集 (8:2)
X_train, X_test, y_train, y_test = train_test_split(
    X_tensor, y, test_size=0.2, random_state=42, stratify=y
)

# 3. 创建数据加载器
batch_size = 100
train_dataset = TensorDataset(X_train, torch.LongTensor(y_train))
test_dataset = TensorDataset(X_test, torch.LongTensor(y_test))

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# 4. 初始化模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SingleCellDecoder(input_dim=n_genes).to(device)

# 5. 定义损失函数和优化器
criterion = nn.MSELoss()  # 假设是基因表达重建任务
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# 6. 训练循环
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    
    # 批量训练
    for batch_X, _ in train_loader:  # 忽略标签（无监督学习）
        batch_X = batch_X.to(device)
        
        # 前向传播
        reconstructed = model(batch_X)
        loss = criterion(reconstructed, batch_X)  # 自编码器重建损失
        
        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * batch_X.size(0)
    
    # 计算epoch平均损失
    train_loss /= len(train_loader.dataset)
    
    # 测试集验证
    model.eval()
    test_loss = 0.0
    with torch.no_grad():
        for batch_X, _ in test_loader:
            batch_X = batch_X.to(device)
            reconstructed = model(batch_X)
            test_loss += criterion(reconstructed, batch_X).item() * batch_X.size(0)
    test_loss /= len(test_loader.dataset)
    
    print(f'Epoch {epoch+1}/{num_epochs} | Train Loss: {train_loss:.4f} | Test Loss: {test_loss:.4f}')

# 7. 保存模型
torch.save(model.state_dict(), 'sc_decoder.pth')

## step 3. model evalutation 