In [2]:
import pandas as pd
import numpy as np
import subprocess
import os
from typing import Dict, List, Set
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

def read_hapmap_positions(file_path: str) -> Dict[int, Set[str]]:
    """读取HapMap3的SNP位置信息，按染色体组织"""
    positions_by_chr = {}
    
    print("Reading HapMap3 positions...")
    with open(file_path, 'r') as f:
        for line in f:
            chrom, pos = line.strip().split(':')
            chrom = int(chrom)
            if chrom not in positions_by_chr:
                positions_by_chr[chrom] = set()
            positions_by_chr[chrom].add(f"{chrom}:{pos}")
    
    return positions_by_chr

def get_population_samples(panel_file: str, target_pops: List[str]) -> Dict[str, List[str]]:
    """获取每个目标种群的样本ID"""
    panel_data = pd.read_csv(panel_file, sep='\t')
    return {pop: panel_data[panel_data['pop'] == pop]['sample'].tolist() 
            for pop in target_pops}

def filter_vcf_by_positions(input_vcf: str, 
                          output_vcf: str, 
                          positions: Set[str],
                          temp_dir: str):
    """使用指定的SNP位置过滤VCF文件"""
    # 创建临时位置文件
    temp_pos_file = os.path.join(temp_dir, f"temp_positions_{os.path.basename(input_vcf)}.txt")
    with open(temp_pos_file, 'w') as f:
        for pos in positions:
            chrom, pos = pos.split(':')
            f.write(f"{chrom}\t{pos}\n")
    
    # 过滤VCF
    cmd = f'bcftools view -T {temp_pos_file} -Oz -o {output_vcf} {input_vcf}'
    subprocess.run(cmd, shell=True, check=True)
    
    # 创建索引
    subprocess.run(f'bcftools index {output_vcf}', shell=True, check=True)
    
    # 删除临时文件
    os.remove(temp_pos_file)

def extract_population_data(vcf_file: str, 
                          samples: List[str], 
                          output_file: str,
                          temp_dir: str):
    """提取指定种群的基因型数据"""
    # 创建临时样本文件
    temp_samples = os.path.join(temp_dir, f"temp_samples_{os.path.basename(vcf_file)}.txt")
    with open(temp_samples, 'w') as f:
        for sample in samples:
            f.write(f"{sample}\n")
    
    # 提取数据
    cmd = (f'bcftools view -S {temp_samples} {vcf_file} | '
           f'bcftools query -f "%CHROM:%POS[\t%GT]\n" > {output_file}')
    subprocess.run(cmd, shell=True, check=True)
    
    # 删除临时文件
    os.remove(temp_samples)

def process_chromosome(chrom: int, 
                      hapmap_positions: Set[str],
                      pop_samples: Dict[str, List[str]],
                      data_dir: str,
                      output_dir: str,
                      temp_dir: str) -> Dict:
    """处理单个染色体的数据"""
    try:
        print(f"\nProcessing chromosome {chrom}...")
        
        # 输入VCF文件
        input_vcf = os.path.join(data_dir, f'filtered.chr{chrom}.phase3.vcf.gz')
        
        # 过滤的中间VCF文件
        filtered_vcf = os.path.join(temp_dir, f'filtered_hapmap.chr{chrom}.vcf.gz')
        
        # 过滤VCF，只保留HapMap SNPs
        print(f"Filtering chromosome {chrom} for HapMap SNPs...")
        filter_vcf_by_positions(input_vcf, filtered_vcf, hapmap_positions, temp_dir)
        
        # 为每个种群提取数据
        results = {}
        for pop, samples in pop_samples.items():
            print(f"Extracting data for {pop} chromosome {chrom}...")
            output_file = os.path.join(output_dir, f'{pop}.chr{chrom}.txt')
            extract_population_data(filtered_vcf, samples, output_file, temp_dir)
            
            # 计算SNP数量
            with open(output_file, 'r') as f:
                snp_count = sum(1 for _ in f)
            results[pop] = snp_count
        
        # 删除中间文件
        os.remove(filtered_vcf)
        if os.path.exists(filtered_vcf + '.csi'):
            os.remove(filtered_vcf + '.csi')
        
        return {'chromosome': chrom, **results}
        
    except Exception as e:
        print(f"Error processing chromosome {chrom}: {e}")
        return None

def main():
    # 配置
    DATA_DIR = '../data/1000g'  # 1000G VCF文件目录
    OUTPUT_DIR = '../data/population_data'  # 输出目录
    TEMP_DIR = '../data/temp'  # 临时文件目录
    PANEL_FILE = '../data/1000g/integrated_call_samples.panel'  # 样本信息文件
    HAPMAP_FILE = '../data/1000g/hapmap3_snps_formatted.txt'  # HapMap SNP位置文件
    
    # 目标种群
    target_pops = ['ASW', 'CEU', 'CHB', 'JPT', 'LWK', 'MXL', 'TSI', 'YRI']
    
    # 创建所需目录
    for directory in [OUTPUT_DIR, TEMP_DIR]:
        os.makedirs(directory, exist_ok=True)
    
    # 读取HapMap位置和种群样本信息
    hapmap_positions = read_hapmap_positions(HAPMAP_FILE)
    pop_samples = get_population_samples(PANEL_FILE, target_pops)
    
    # 处理所有染色体
    results = []
    with ThreadPoolExecutor(max_workers=3) as executor:
        future_to_chrom = {}
        for chrom in range(1, 23):
            future = executor.submit(
                process_chromosome,
                chrom,
                hapmap_positions[chrom],
                pop_samples,
                DATA_DIR,
                OUTPUT_DIR,
                TEMP_DIR
            )
            future_to_chrom[future] = chrom
        
        for future in as_completed(future_to_chrom):
            result = future.result()
            if result:
                results.append(result)
    
    # 创建汇总报告
    summary_df = pd.DataFrame(results)
    summary_df = summary_df.sort_values('chromosome')
    
    # 保存汇总报告
    summary_file = os.path.join(OUTPUT_DIR, 'processing_summary.csv')
    summary_df.to_csv(summary_file, index=False)
    
    # 合并每个种群的所有染色体数据
    print("\nMerging chromosome files for each population...")
    for pop in target_pops:
        all_data = []
        for chrom in range(1, 23):
            chr_file = os.path.join(OUTPUT_DIR, f'{pop}.chr{chrom}.txt')
            if os.path.exists(chr_file):
                with open(chr_file, 'r') as f:
                    all_data.extend(f.readlines())
                os.remove(chr_file)  # 删除单染色体文件
        
        # 保存合并后的数据
        final_file = os.path.join(OUTPUT_DIR, f'{pop}.genotypes.txt')
        with open(final_file, 'w') as f:
            f.writelines(all_data)
    
    # 清理临时目录
    import shutil
    shutil.rmtree(TEMP_DIR)
    
    print("\nProcessing complete!")
    print(f"Summary saved to: {summary_file}")
    print("\nFinal statistics:")
    print(summary_df.describe())

if __name__ == "__main__":
    main()

Reading HapMap3 positions...

Processing chromosome 1...
Filtering chromosome 1 for HapMap SNPs...

Processing chromosome 2...
Filtering chromosome 2 for HapMap SNPs...

Processing chromosome 3...
Filtering chromosome 3 for HapMap SNPs...
Extracting data for ASW chromosome 3...
Extracting data for CEU chromosome 3...
Extracting data for CHB chromosome 3...
Extracting data for JPT chromosome 3...
Extracting data for LWK chromosome 3...
Extracting data for MXL chromosome 3...
Extracting data for TSI chromosome 3...
Extracting data for YRI chromosome 3...

Processing chromosome 4...
Filtering chromosome 4 for HapMap SNPs...
Extracting data for ASW chromosome 1...
Extracting data for CEU chromosome 1...
Extracting data for CHB chromosome 1...
Extracting data for JPT chromosome 1...
Extracting data for LWK chromosome 1...
Extracting data for MXL chromosome 1...
Extracting data for TSI chromosome 1...
Extracting data for YRI chromosome 1...

Processing chromosome 5...
Filtering chromosome 5 