In [1]:
import numpy as np
import pandas as pd
from scipy.stats import pearsonr
from sklearn.utils import resample
import scanpy as sc
import re
import json
from multiprocessing import Pool, cpu_count
from functools import partial

In [2]:
newmarkergenes = pd.read_excel("./markergenes.xlsx",header=None)
reference_data = sc.read_text('./MTG18GluN_TPM4mapping.txt')#genes*cells
mata_data = pd.read_csv('./meta_data.txt',sep='\t')

In [3]:
patch_data = sc.read_text('./VEN4mapping.txt')

In [4]:
patch_meta = pd.read_csv('./VEN4mapping_meta_data.txt',sep='\t')

In [5]:
patch_data.var['Cell name'] = patch_data.var.index
patch_meta.index.name = 'Cell name'
patch_data.var = patch_data.var.reset_index().merge(patch_meta, on='Cell name', how='left').set_index('index')
patch_data.var.index.name = None

In [6]:
reference_data.var['sample_name'] = reference_data.var.index
# mata_data.index.name = 'sample_name'
reference_data.var = reference_data.var.reset_index().merge(mata_data, on='sample_name', how='left').set_index('index')
reference_data.var.index.name = None

In [7]:
# 创建一个字典以存储每个细胞类型的所有父类
parent_dict = {}
all_parents_dict = {}

# 遍历数据框构建父类字典
for index, row in newmarkergenes.iterrows():
    child_type = row[0]
    parent_type = row[1]
    
    # 初始化子类型的父类列表
    if child_type not in parent_dict:
        parent_dict[child_type] = []

    # 将父类添加到字典中
    parent_dict[child_type].append(parent_type)

# 定义一个函数用于查找所有父类
def find_all_parents(cell_type, parent_dict):
    all_parents = []
    current_type = cell_type
    
    # 遍历父类直到没有父类为止
    while current_type in parent_dict:
        parents = parent_dict[current_type]
        all_parents.extend(parents)
        current_type = parents[0] if parents else None  # 只取第一个父类
    
    return list(set(all_parents))  # 去重

# 将每个细胞类型的所有父类存储到 all_parents_dict 中
for cell_type in parent_dict.keys():
    all_parents_dict[cell_type] = find_all_parents(cell_type, parent_dict)

# 将每个细胞类型的所有父类存储到 reference_data.var 中
reference_data.var['all_parents'] = reference_data.var['cluster'].apply(
    lambda x: find_all_parents(x, parent_dict)
)

In [8]:
# 获取所有行标签
row_labels = patch_data.var_names.to_list()

# 定义前缀的排序顺序
prefix_order = ['cVEN', 'ETPC', 'ETPC2', 'PC', 'TRI']

# 定义排序函数：先根据前缀排序，再根据数字排序
def sort_key(label):
    # 提取前缀
    prefix = next((p for p in prefix_order if label.startswith(p)), '')
    # 提取数字部分，若无数字则为 0
    number = int(re.findall(r'\d+', label)[0]) if re.findall(r'\d+', label) else 0
    # 返回前缀顺序和数字，确保先按前缀排序，再按数字排序
    return (prefix_order.index(prefix), number)

# 根据自定义的排序函数对行标签进行排序
sorted_rows = sorted(row_labels, key=sort_key)

In [9]:
patch_data = patch_data[:,sorted_rows]

In [10]:
deep_layers_dict = {}

for index, row in newmarkergenes.iterrows():
    # 第一个元素作为外层字典的键
    outer_key = row[0]
    # 第二个元素作为内层字典的键
    inner_key = row[1]
    # 剩余元素组成列表
    values_list = row[2:].tolist()
    
    # 创建内层字典
    if outer_key not in deep_layers_dict:
        deep_layers_dict[outer_key] = {}
    
    deep_layers_dict[outer_key][inner_key] = values_list



In [11]:
# 定义树的节点 
class TreeNode:
    def __init__(self, name, marker_genes=None, children=None):
        self.name = name
        self.marker_genes = marker_genes if marker_genes else []  # 默认是空列表
        self.children = children if children else []  # 默认是空列表

    def is_leaf(self):
        return len(self.children) == 0

    def add_child(self, child_node):
        """给当前节点添加一个子节点"""
        self.children.append(child_node)

    def __repr__(self, level=0):
        """递归打印树结构"""
        ret = "\t" * level + repr(self.name) + "\n"
        if self.marker_genes:
            ret += "\t" * (level + 1) + "Marker genes: " + repr(self.marker_genes) + "\n"
        for child in self.children:
            ret += child.__repr__(level + 1)
        return ret


def build_tree_from_dict(marker_genes_dict, root_name):
    """根据 marker_genes_dict 构建树，根节点为 root_name"""
    # 创建节点的映射，key是节点名称，value是TreeNode对象
    nodes = {}

    # 遍历字典，创建所有节点但不连接它们
    for node_name, data in marker_genes_dict.items():
        parent_name, marker_genes = list(data.items())[0]  # 提取父节点名和 marker genes
        # 如果节点还没有创建，创建它
        if node_name not in nodes:
            nodes[node_name] = TreeNode(node_name, marker_genes=marker_genes)
        else:
            nodes[node_name].marker_genes = marker_genes

        # 如果父节点还没有创建，创建它
        if parent_name not in nodes:
            nodes[parent_name] = TreeNode(parent_name)

        # 将当前节点作为父节点的子节点
        nodes[parent_name].add_child(nodes[node_name])

    # 返回根节点
    return nodes[root_name]

In [12]:
# 计算细胞转录组与参考数据之间的皮尔逊相关性
def compute_correlation(patch_data, reference_data, marker_genes,celltype):
    """
    计算一个细胞转录组与参考细胞类型之间基于指定标记基因的皮尔逊相关性。
    """
    #自助抽样70%的参考数据和标记基因
    target_type = celltype
        # 定义一个函数检查父类中是否包含目标类型
    def contains_target(cell_type):
        parents = all_parents_dict.get(cell_type, [])
        return target_type in parents
    
    if celltype in reference_data.var['cluster'].unique():
        new_reference_data =reference_data[:, reference_data.var['cluster']==celltype]
    else:
        new_reference_data = reference_data[:,reference_data.var['cluster'].apply(contains_target)]
        
    
    # if celltype in ['Deep Layers', 'Superficial Layers']:
    #     new_reference_data =reference_data[:, reference_data.var['level3']==celltype]
    # else:
    #     new_reference_data =reference_data[:, reference_data.var['cluster']==celltype]

    
    bootstrapped_ref_data = new_reference_data[:, resample(range(new_reference_data.n_vars), replace=False, n_samples=int(0.7 * new_reference_data.n_vars))]
    
    # 过滤掉 a 列表中不在 patch_data.obs_names 中的元素
    filter_marker_genes = [gene for gene in marker_genes if gene in patch_data.obs_names and gene in reference_data.obs_names]


    bootstrapped_marker_genes = resample(filter_marker_genes, replace=False, n_samples=int(0.7 * len(filter_marker_genes)))

    # 从AnnData对象中提取细胞的基因表达数据
    cell_expr = patch_data[bootstrapped_marker_genes,:].X.T    
    # ref_expr = reference_data[bootstrapped_marker_genes, reference_data.var['cluster']==celltype].X.T

    ref_expr = bootstrapped_ref_data[bootstrapped_marker_genes, :].X.T
    # 计算每个参考细胞类型与当前细胞的皮尔逊相关性
    correlations = []
    
    for ref_cell in ref_expr:
        if len(cell_expr.flatten()) <= 1 or len(ref_cell) <= 1:
            correlations.append(np.nan)
        elif np.all(cell_expr.flatten() == cell_expr.flatten()[0]) or np.all(ref_cell == ref_cell[0]):
            correlations.append(np.nan)
        else:
            corr, _ = pearsonr(cell_expr.flatten(), ref_cell)
            correlations.append(corr)
    # 过滤掉 NaN
    correlations = [x for x in correlations if not np.isnan(x)]
    # print(celltype,np.mean(correlations))
    # 返回相关性数组
    return np.array(correlations)

In [13]:
def map_cell_type(patch_data, reference_data, node):
    best_match = node.name  # 初始化最佳匹配为当前节点
    best_correlation = float('-inf')  # 初始化最佳相关性为负无穷

    # 遍历子节点，找到相关性最高的子节点
    best_child = None
    for child in node.children:
        child_correlation = np.mean(compute_correlation(patch_data, reference_data, child.marker_genes, child.name))
        if child_correlation > best_correlation:
            best_child = child
            best_correlation = child_correlation

    # 如果找到最佳子节点，则递归查找它的孩子
    if best_child:
        return map_cell_type(patch_data, reference_data, best_child)

    # 如果没有子节点，返回当前节点和最佳相关性
    return best_match, best_correlation


In [14]:
def bootstrap_mapping(patch_data, reference_data, n_iterations=100):
    cell_type_counts = {}
    for _ in range(n_iterations):

            # 手动定义根节点 'Glutamatergic neuron'
            root_name = 'n7'

            # 构建树
            root = build_tree_from_dict(deep_layers_dict, root_name)
            # root = build_tree(deep_layers_dict)


    #         # 将细胞转录组映射到树上
            best_cell_type, best_correlation = map_cell_type(patch_data, reference_data, root)
            ########################################################################################################################################
            # 统计每种细胞类型的映射次数
            if best_cell_type not in cell_type_counts:
                cell_type_counts[best_cell_type] = 0
            cell_type_counts[best_cell_type] += 1
            # print(cell_type_counts)
    # 计算映射概率
    total_mappings = sum(cell_type_counts.values())
    mapping_probabilities = {cell_type: count / total_mappings for cell_type, count in cell_type_counts.items()}
    return mapping_probabilities


In [15]:
# # 逐个细胞进行映射
# def map_all_cells(patch_data, reference_data):
#     """
#     对cell_transcriptome中的每个细胞进行逐一映射，并返回映射结果。
#     """
#     results = {}

#     # 遍历cell_transcriptome中的每个细胞
#     for cell in patch_data.var_names:
#         # 获取当前细胞的转录组数据
#         single_cell_data = patch_data[:,patch_data.var_names==cell]
        
#         # 映射该细胞到最相关的细胞类型
#         mapped_cell_type = bootstrap_mapping(single_cell_data, reference_data)
        
#         # 保存映射结果
#         results[cell] = mapped_cell_type
#         print(results)

#         # 保存为 JSON 文件
#         with open('./result/mapping3/celltype_scoressmall2.json', 'w') as json_file:
#             json.dump(results, json_file, indent=4)
#     return results

# 将process_cell函数移到全局作用域
def process_cell(cell, patch_data, reference_data):
    """处理单个细胞的函数，现在在全局作用域"""
    single_cell_data = patch_data[:, patch_data.var_names == cell]
    mapped_cell_type = bootstrap_mapping(single_cell_data, reference_data)
    return cell, mapped_cell_type

def map_all_cells(patch_data, reference_data):
    """
    并行化版本的对所有细胞进行映射的函数
    保持原函数名不变，固定使用10个核心
    """
    # 固定使用10个核心
    n_cores = 20
    
    # 用于存储最终结果的字典
    results = {}
    
    # 创建进程池，限制为10个核心
    with Pool(n_cores) as pool:
        # 使用partial固定patch_data和reference_data参数
        from functools import partial
        process_cell_partial = partial(process_cell, 
                                     patch_data=patch_data, 
                                     reference_data=reference_data)
        
        # 使用imap按顺序获取结果
        for i, (cell, mapped_cell_type) in enumerate(pool.imap(process_cell_partial, patch_data.var_names)):
            results[cell] = mapped_cell_type
            print(f"已完成细胞 {cell} 的映射")
            
            # 每处理10个细胞保存一次中间结果
            if (i + 1) % 10 == 0:
                with open('./result/mapping/celltype_scoressmall2.json', 'w') as f:
                    json.dump(results, f, indent=4)
                print(f"已保存前{i+1}个细胞的结果")
    
    # 最终保存所有结果
    with open('./result/mapping/celltype_scoressmall2.json', 'w') as f:
        json.dump(results, f, indent=4)
    
    return results


In [None]:
cell_mapping_results = map_all_cells(patch_data, reference_data)

已完成细胞 cVEN01 的映射
已完成细胞 cVEN02 的映射
已完成细胞 cVEN03 的映射
已完成细胞 cVEN04 的映射
已完成细胞 cVEN05 的映射
已完成细胞 cVEN06 的映射
已完成细胞 cVEN07 的映射
已完成细胞 cVEN08 的映射
已完成细胞 cVEN09 的映射
已完成细胞 cVEN10 的映射
已保存前10个细胞的结果
已完成细胞 cVEN11 的映射
已完成细胞 cVEN12 的映射
已完成细胞 cVEN13 的映射
已完成细胞 cVEN14 的映射
已完成细胞 cVEN15 的映射
已完成细胞 cVEN16 的映射
已完成细胞 cVEN17 的映射
已完成细胞 cVEN18 的映射
已完成细胞 cVEN19 的映射
已完成细胞 cVEN20 的映射
已保存前20个细胞的结果
已完成细胞 cVEN21 的映射
已完成细胞 cVEN22 的映射
已完成细胞 PC02 的映射
已完成细胞 PC03 的映射
已完成细胞 PC04 的映射
已完成细胞 PC05 的映射
已完成细胞 PC06 的映射
已完成细胞 PC07 的映射
已完成细胞 PC08 的映射
已完成细胞 PC09 的映射
已保存前30个细胞的结果
已完成细胞 PC10 的映射
已完成细胞 PC11 的映射
已完成细胞 PC12 的映射
已完成细胞 PC13 的映射
已完成细胞 PC14 的映射
已完成细胞 PC15 的映射
已完成细胞 PC16 的映射
已完成细胞 PC17 的映射
已完成细胞 PC18 的映射
已完成细胞 PC19 的映射
已保存前40个细胞的结果
已完成细胞 PC20 的映射
已完成细胞 PC21 的映射
已完成细胞 PC22 的映射
已完成细胞 PC23 的映射
已完成细胞 PC24 的映射
已完成细胞 PC25 的映射
已完成细胞 PC26 的映射
已完成细胞 PC27 的映射
已完成细胞 PC28 的映射
已完成细胞 PC29 的映射
已保存前50个细胞的结果
已完成细胞 PC30 的映射
已完成细胞 PC31 的映射
已完成细胞 PC32 的映射
已完成细胞 PC33 的映射
已完成细胞 PC34 的映射
已完成细胞 PC35 的映射
已完成细胞 PC36 的映射
已完成细胞 PC37 的映射
已完成细胞 PC38 的映射
已完成细胞 

In [None]:
# with open('./result/celltype_scoressmall2.json', 'r') as file:
#     cell_mapping_results = json.load(file)

In [None]:
# 保存细胞名和得分最高的细胞类型
celltype_result = []

# 遍历字典，找到每个细胞名对应的得分最高的细胞类型
for cell_name, types_scores in cell_mapping_results.items():
    # 找到得分最高的细胞类型
    best_type = max(types_scores, key=types_scores.get)
    # 保存到结果列表
    celltype_result.append([cell_name, best_type])

# 将结果转换为 pandas DataFrame
df_result = pd.DataFrame(celltype_result, columns=["cell_name", "cell_type"])

In [None]:
df_result.to_csv("./result/mapping/cell_type_resultssmall2.csv", index=False)

In [None]:
df_result['cell_type'].unique()

In [None]:
cell_types = ['Exc L4-5 FEZF2 SCN4B','Exc L5-6 THEMIS C1QL3','Exc L2-3 LINC00507 FREM3','Exc L2 LAMP5 LTK','Exc L2-4 LINC00507 GLP2R','Exc L3-4 RORB CARM1P1',
'Exc L3-5 RORB ESR1','Exc L3-5 RORB COL22A1','Exc L3-5 RORB FILIP1L','Exc L3-5 RORB TWIST2','Exc L4-5 RORB FOLH1B','Exc L4-6 RORB SEMA3E',
'Exc L4-5 RORB DAPK2','Exc L5-6 RORB TTC12','Exc L4-6 RORB C1R']
# 创建 DataFrame，细胞为行，细胞类型为列，初始化为 0
df = pd.DataFrame(0, index=cell_mapping_results.keys(), columns=cell_types)

# 填充 DataFrame
for cell, types_scores in cell_mapping_results.items():
    for cell_type, score in types_scores.items():
        if cell_type in df.columns:
            df.loc[cell, cell_type] = score



In [None]:
import re

# 获取所有行标签
row_labels = df.index.tolist()

# 定义前缀的排序顺序
prefix_order = ['cVEN', 'ETPC', 'ETPC2', 'PC', 'TRI']

# 定义排序函数：先根据前缀排序，再根据数字排序
def sort_key(label):
    # 提取前缀
    prefix = next((p for p in prefix_order if label.startswith(p)), '')
    # 提取数字部分，若无数字则为 0
    number = int(re.findall(r'\d+', label)[0]) if re.findall(r'\d+', label) else 0
    # 返回前缀顺序和数字，确保先按前缀排序，再按数字排序
    return (prefix_order.index(prefix), number)

# 根据自定义的排序函数对行标签进行排序
sorted_rows = sorted(row_labels, key=sort_key)

# 重新排序 df 的行
df_sorted = df.reindex(sorted_rows)



In [None]:
import seaborn as sns
clustergrid1 = sns.clustermap(df_sorted.T,row_cluster=False,col_cluster=False,method='ward',dendrogram_ratio=(0.02, 0.10), cmap='YlGnBu', figsize=(30, 10),cbar_pos=(1, 0.7, 0.01, 0.2)) # 设置颜色条的位置和大小)#,metric='cosine'
clustergrid1.savefig("./result/mapping/celltpye_scores_heatmapsmall2.pdf", dpi=300)

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# 假设 df_result 包含 'cell_name' 和 'cell_type' 两列
# 统计每种 cell_type 的数量
cell_type_counts = df_result['cell_type'].value_counts()

# 创建一个柱状图
plt.figure(figsize=(10, 8))  # 调整图形大小
ax = sns.barplot(x=cell_type_counts.index, y=cell_type_counts.values, palette="viridis")

# 在柱状图上添加数字标签
for i, value in enumerate(cell_type_counts.values):
    ax.text(i, value + 0.5, str(value), ha='center', fontsize=10)  # 将文本置于每个柱子上方，+0.5 可以稍微提高数字位置

# 添加标题和标签
plt.title('Number of Cells for Each Cell Type', fontsize=16)
plt.xlabel('Cell Type', fontsize=12)
plt.ylabel('Number of Cells', fontsize=12)

# 旋转x轴标签以避免重叠
plt.xticks(rotation=90, fontsize=10)

# 显示图像
plt.tight_layout()
plt.savefig("./result/mapping/cell_type_barplot_with_countssmall2.pdf", dpi=300, bbox_inches='tight')
plt.show()
