## pbmc数据生成

In [None]:
import scanpy as sc
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split


file_path = "/home/yu.zhiyin/CellRAG/pbmc_rare_0.003619.h5ad"
adata = sc.read_h5ad(file_path)
bone_marrow_cells = adata[adata.obs['tissue'] == 'Bone_Marrow']
cell_types = bone_marrow_cells.obs['final_annotation'].unique()
cell_types = cell_types[:15]


train_data = []
test_data = []


for cell_type in cell_types:
    cell_type_data = bone_marrow_cells[bone_marrow_cells.obs['final_annotation'] == cell_type]
    cell_indices = cell_type_data.obs.index
    
    if len(cell_indices) < 50:
        print(f"Warning: Not enough samples for cell type {cell_type}. Skipping.")
        continue

    train_size = 30
    test_size = 20
    
    train_cells, test_cells = train_test_split(cell_indices, train_size=train_size, test_size=test_size, random_state=42)
    
    for cell_id in train_cells:
        # 获取细胞的基因表达数据
        cell_expr = cell_type_data[cell_id].X.toarray().flatten()
        # 提取表达水平前 100 的基因
        top_genes_idx = np.argsort(cell_expr)[::-1][:100]  # 从高到低排序
        top_genes = adata.var_names[top_genes_idx]  # 获取基因名称
        train_data.append(f"Top 100 genes for this cell (highest expression first): {', '.join(top_genes)}. Cell type: {cell_type}. Tissue: Bone_Marrow.\n")
    
    for cell_id in test_cells:
        cell_expr = cell_type_data[cell_id].X.toarray().flatten()
        top_genes_idx = np.argsort(cell_expr)[::-1][:100]  
        top_genes = adata.var_names[top_genes_idx]  
        test_data.append(f"Top 100 genes for this cell (highest expression first): {', '.join(top_genes)}. Cell type: {cell_type}. Tissue: Bone_Marrow.\n")

with open('train_data.txt', 'w') as f:
    f.writelines(train_data)

with open('test_data.txt', 'w') as f:
    f.writelines(test_data)

print("数据生成完成")


## heart数据生成

In [None]:
import scanpy as sc
import numpy as np
from sklearn.model_selection import train_test_split

file_path = r"E:\yzy\research\human_heart_atlas.h5ad"

adata = sc.read_h5ad(file_path)
tissue_counts = adata.obs.groupby('tissue').size()
print(tissue_counts)

tissues = adata.obs['tissue'].unique()

train_data = []
test_data = []

for tissue in tissues:
    tissue_data = adata[adata.obs['tissue'] == tissue]
    cell_indices = tissue_data.obs.index

    if len(cell_indices) < 125:
        print(f"Warning: Not enough samples for tissue {tissue}. Skipping.")
        continue

    selected_cells = np.random.choice(cell_indices, 125, replace=False)
    train_cells, test_cells = train_test_split(selected_cells, train_size=75, test_size=50, random_state=42)

    for cell_id in train_cells:
        # 获取细胞的基因表达数据
        cell_expr = tissue_data[cell_id].X.toarray().flatten()
        # 提取表达水平前 100 的基因
        top_genes_idx = np.argsort(cell_expr)[::-1][:100]  # 从高到低排序
        top_genes = adata.var['feature_name'].iloc[top_genes_idx].values  # 获取基因名称
        cell_type = tissue_data.obs.loc[cell_id, 'cell_type']  # 获取cell_type信息
        train_data.append(
            f"Top 100 genes for this cell (highest expression first): {', '.join(top_genes)}. Cell type: {cell_type}. Tissue: {tissue_data.obs['tissue'].values[0]}.\n")

    for cell_id in test_cells:
        cell_expr = tissue_data[cell_id].X.toarray().flatten()
        top_genes_idx = np.argsort(cell_expr)[::-1][:100]  
        top_genes = adata.var['feature_name'].iloc[top_genes_idx].values 
        cell_type = tissue_data.obs.loc[cell_id, 'cell_type']  
        test_data.append(
            f"Top 100 genes for this cell (highest expression first): {', '.join(top_genes)}. Cell type: {cell_type}. Tissue: {tissue_data.obs['tissue'].values[0]}.\n")

with open('train_data_1.txt', 'w') as f:
    f.writelines(train_data)

with open('test_data_1.txt', 'w') as f:
    f.writelines(test_data)

## panceas数据生成

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

label_file = 'human_pancreas_data_label.csv'  
expression_file = 'human_pancreas_data.csv'  

labels = pd.read_csv(label_file)  
expression_data = pd.read_csv(expression_file, index_col=0)  


cell_type_sample_sizes = { 
    # 'acinar': (75, 50),
    # 'alpha': (75, 50),
    # 'beta': (75, 50),
    # 'delta': (75, 50),
     'endothelial': (2, 1),
    # 'gamma': (75, 50)
    # 'ductal':(95,65)
}

train_data = []
test_data = []

for cell_type, (train_size, test_size) in cell_type_sample_sizes.items():
    cell_type_cells = labels[labels['celltype'] == cell_type]['cell_id'].values

    if len(cell_type_cells) < train_size + test_size:
        print(f"Warning: Not enough samples for cell type {cell_type}. Skipping.")
        continue

    train_cells, test_cells = train_test_split(cell_type_cells, train_size=train_size, test_size=test_size, random_state=42)

    for cell_id in train_cells:
        if cell_id not in expression_data.columns:
            print(f"Warning: Cell ID {cell_id} not found in expression data. Skipping.")
            continue
        
      
        cell_expr = expression_data[cell_id].values  
        top_genes_idx = np.argsort(cell_expr)[::-1][:100]  
        top_genes = expression_data.index[top_genes_idx]
        train_data.append(
            f"Top 100 genes for this cell (highest expression first): {', '.join(top_genes)}. Cell type: {cell_type}. Tissue: pancreas.\n")

    for cell_id in test_cells:
        if cell_id not in expression_data.columns:
            print(f"Warning: Cell ID {cell_id} not found in expression data. Skipping.")
            continue

        cell_expr = expression_data[cell_id].values  
        top_genes_idx = np.argsort(cell_expr)[::-1][:100]  
        top_genes = expression_data.index[top_genes_idx]
        test_data.append(
            f"Top 100 genes for this cell (highest expression first): {', '.join(top_genes)}. Cell type: {cell_type}. Tissue: pancreas.\n")

with open('train_data_2.txt', 'w') as f:
    f.writelines(train_data)

with open('test_data.txt_2', 'w') as f:
    f.writelines(test_data)

print("Data processing complete.")

