# 一、vcf和csv的ID映射

In [41]:
import pandas as pd

In [43]:
df_本研究= pd.read_csv('../input/订正之后的单倍群名称_LLT.csv')
df_本研究.loc[:,['ID']].to_csv('../conf/本项目_samples.txt', index=False, header=False)

In [44]:
!bcftools query -l ../data/merged_biallelic_complete_fixed_anno.vcf.gz \
   |sort -u > ../conf/VCF_samples.txt

In [45]:
# 1. 对两个原始样本文件排序，确保符合 comm 要求
!LC_ALL=C sort ../conf/本项目_samples.txt > ../conf/本项目_samples_sorted.txt
!LC_ALL=C sort ../conf/VCF_samples.txt     > ../conf/VCF_samples_sorted.txt

# 2. 使用 comm 提取两者共有的样本名（即交集部分）
!comm -12 ../conf/本项目_samples_sorted.txt ../conf/VCF_samples_sorted.txt \
  > ../conf/本项目_VCF_samples_common.txt
!rm ../conf/VCF_samples.txt ../conf/本项目_samples.txt ../conf/本项目_samples_sorted.txt ../conf/VCF_samples_sorted.txt

In [46]:
df_本研究

Unnamed: 0,ID,Haplogroup
0,KU682980,I2
1,KU683240,I2
2,KU683033,I2
3,KU683502,I2
4,EF556173,L5a1a
...,...,...
158007,A76LA8QYT57L,Z4a1
158008,Hui_Dahejia3804,Z5
158009,Han_Taian5030,Z7
158010,K38531,Z7


In [49]:
df_comm = pd.read_csv('../conf/本项目_VCF_samples_common.txt', header=None)
df_comm.rename(columns={0:'ID'}, inplace=True)
df_comm = df_comm.merge(df_本研究,on='ID',how='left')
df_comm.to_csv('../input/ID_Hap.csv', index=False, header=True)

# 二、对本项目的在VCF存在的单倍群进行整理

In [51]:
%%writefile 自动提取单倍群列表.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
脚本：自动提取单倍群列表.py
功能：对输入的 CSV（ID,Haplogroup）和 Phylotree 树文件：
    1. 去重获取所有目标单倍群；
    2. 分别提取每个目标单倍群的直接子节点和所有后代节点对应样本，
       输出到指定目录下的 direct_children.csv 与 all_descendants.csv。
用法：
    python3 自动提取单倍群列表.py \
        --haplo-file /path/to/phylotree.txt \
        --input-csv  /path/to/samples.csv \
        --out-dir     /path/to/output_directory \
        [--encoding utf-8]
"""
import os
import csv
import argparse


def parse_tree(haplo_file, encoding='utf-8'):
    """解析 Phylotree 树文件，返回 (level, name) 列表"""
    tree = []
    with open(haplo_file, 'r', encoding=encoding) as f:
        for raw in f:
            if not raw.strip():
                continue
            level = raw.count('\t')
            name = raw.strip()
            tree.append((level, name))
    return tree


def collect_downstreams(tree, query, direct_only=False):
    """
    收集 query 自身及下游节点名称列表
    direct_only=True 仅直接子节点；False 则所有后代（含自身）
    """
    found = False
    base_level = None
    downstream = []
    for level, name in tree:
        if not found:
            if name == query:
                found = True
                base_level = level
                downstream.append(name)  # 包含自身
            continue
        # 已经找到 query
        if direct_only:
            if level == base_level + 1:
                downstream.append(name)
            elif level <= base_level:
                break
        else:
            if level > base_level:
                downstream.append(name)
            else:
                break
    return downstream


def main():
    parser = argparse.ArgumentParser(
        description="自动提取直接子单倍群与所有后代对应样本"
    )
    parser.add_argument('--haplo-file', '-hf', required=True,
                        help="Phylotree 树文件路径")
    parser.add_argument('--input-csv', '-i', required=True,
                        help="样本 CSV，第一列 ID，第二列 Haplogroup")
    parser.add_argument('--out-dir', '-o', required=True,
                        help="输出目录，文件名自动生成")
    parser.add_argument('--encoding', default="utf-8",
                        help="文件编码（默认 utf-8）")
    args = parser.parse_args()

    # 1. 确保输出目录存在
    out_dir = args.out_dir
    os.makedirs(out_dir, exist_ok=True)

    # 2. 解析树文件
    tree = parse_tree(args.haplo_file, encoding=args.encoding)

    # 3. 读取原始 CSV 并去重获取目标单倍群列表
    samples = []
    targets = set()
    with open(args.input_csv, 'r', encoding=args.encoding, newline='') as f:
        reader = csv.reader(f)
        for row in reader:
            if len(row) < 2:
                continue
            sid, hap = row[0].strip(), row[1].strip()
            samples.append((sid, hap))
            targets.add(hap)
    targets = sorted(targets)

    # 4. 自动生成两个输出文件路径
    direct_path = os.path.join(out_dir, "直接后代.csv")
    all_path    = os.path.join(out_dir, "所有后代.csv")

    # 5. 打开输出文件并写入表头
    header = ['Target_Haplogroup', 'ID', 'Haplogroup']
    with open(direct_path, 'w', encoding=args.encoding, newline='') as fd, \
         open(all_path,    'w', encoding=args.encoding, newline='') as fa:

        writer_direct = csv.writer(fd)
        writer_all    = csv.writer(fa)
        writer_direct.writerow(header)
        writer_all.writerow(header)

        # 6. 遍历每个目标单倍群，匹配并写入
        for tgt in targets:
            downstream_direct = collect_downstreams(tree, tgt, direct_only=True)
            downstream_all    = collect_downstreams(tree, tgt, direct_only=False)

            for sid, hap in samples:
                if hap in downstream_direct:
                    writer_direct.writerow([tgt, sid, hap])
                if hap in downstream_all:
                    writer_all.writerow([tgt, sid, hap])

    print(f"✅ 直接子单倍群结果已写入：{os.path.abspath(direct_path)}")
    print(f"✅ 所有后代结果已写入：{os.path.abspath(all_path)}")


if __name__ == '__main__':
    main()


Overwriting 自动提取单倍群列表.py


In [52]:
# 第一列 ID，第二列 Haplogroup
!/home/luolintao/miniconda3/bin/python3 \
  ./自动提取单倍群列表.py \
  --haplo-file "../conf/线粒体单倍群phylotree(version17)2025年3月12日.txt" \
  --input-csv ../input/ID_Hap.csv \
  --out-dir ../input/

✅ 直接子单倍群结果已写入：/mnt/f/OneDrive/文档（科研）/脚本/我的科研脚本/Python/母系专用/ρ方法/2-从vcf/input/直接后代.csv
✅ 所有后代结果已写入：/mnt/f/OneDrive/文档（科研）/脚本/我的科研脚本/Python/母系专用/ρ方法/2-从vcf/input/所有后代.csv


# 二、准备rho方法文件

## haplogrep分数筛选

In [76]:
import pandas as pd

In [77]:
df_质量控制 = pd.read_csv('../input/质量控制.csv',low_memory=False).loc[:,['SampleID','Quality']]
df_质量控制

Unnamed: 0,SampleID,Quality
0,KU682980,1.0092
1,KU683240,1.0092
2,KU683033,1.0092
3,KU683502,1.0092
4,EF556173,1.008
...,...,...
111296,Han_Guangdong1705,LLT_Illumina
111297,Han_Henan2440,LLT_Illumina
111298,Han_Sichuan3602,LLT_Illumina
111299,Salar_Dahejia3801,LLT_Illumina


In [78]:
df_所有后代  = pd.read_csv('../input/所有后代.csv').iloc[:,0:2]
df_所有后代  =  df_所有后代.merge(df_质量控制, left_on='ID',
                          right_on='SampleID', how='left')
df_所有后代.loc[df_所有后代['Quality'].astype(float)>=0.85]

Unnamed: 0,Target_Haplogroup,ID,SampleID,Quality
0,A,1000612,1000612,0.9581
1,A,1009704,1009704,0.9746
2,A,111-1201-7473,111-1201-7473,0.946
3,A,111-1414-1183,111-1414-1183,0.9764
4,A,111-1414-2845,111-1414-2845,0.9043
...,...,...,...,...
494947,Z_T152C,SD265,SD265,0.9367
494948,Z_T152C,SD267,SD267,0.9046
494949,Z_T152C,WDC11806290079,WDC11806290079,0.9456
494950,Z_T152C,WDC11806290096,WDC11806290096,0.948


In [79]:
df_所有后代.rename(columns={'Target_Haplogroup':'Haplogroup'}, inplace=True)
df_所有后代.loc[:,['ID','Haplogroup']].to_csv('../input/rho.csv', index=False)

# 四、计算每种单倍群的数量

In [85]:
df_rho = pd.read_csv('../input/rho.csv')
df_rho['Count'] = df_rho.groupby('Haplogroup')['Haplogroup'].transform('count')
df_rho.loc[df_rho['Count']>=10,['Haplogroup','Count']].drop_duplicates().to_csv('../output/Rho_number.csv',index=False)

In [86]:
df_rho.loc[df_rho['Count']>=10,['Haplogroup','Count']]

Unnamed: 0,Haplogroup,Count
0,A,3037
1,A,3037
2,A,3037
3,A,3037
4,A,3037
...,...,...
494947,Z_T152C,963
494948,Z_T152C,963
494949,Z_T152C,963
494950,Z_T152C,963
