In [None]:
from pathlib import Path
import glob
import pandas as pd
import numpy as np
import multiprocessing
from itertools import chain, islice
from tqdm import tqdm
import shutil

In [None]:
projectDir = Path("/home/_shared/jscliu/project/2025/Flagship/analysis/secondary/dominant/01.known_small_variants")
referenceDir = Path("/home/_shared/jscliu/project/2025/Flagship/reference")
resultDir = projectDir/"results"
summaryDir = projectDir/"summary"
summaryDir.mkdir(exist_ok=True)

# Reference
af_bed = referenceDir/"acmg_sf3_2.ext200bp.bed"
cohort_founder_list = referenceDir/"cohort_founder_list.2024-11-12.csv"
sample_info_csv = referenceDir/"sample_info_annot.2024-11-12.csv"
vcf_headers = referenceDir/"hkgi_dominant.vcf_headers.txt"

# Output
out_tsv = summaryDir/"dominant.known_variants.tsv"
out_vcf = summaryDir/"dominant.known_variants.vcf"

In [None]:
cohort_founder_df = pd.read_csv(cohort_founder_list, names=['participant_id', 'sample_id'])

In [None]:
tsv_ls:list = [resultDir/f"{p}.split.rare.vep.additionalFinding.tsv" for p in cohort_founder_df['participant_id']]

In [None]:
def split_list(lst, chunk_size):
    it = iter(lst)
    return [list(islice(it, chunk_size)) for _ in range(0, len(lst), chunk_size)]

chunk_tsv_ls = split_list(tsv_ls, 2000)

# Read reference af_bed

In [None]:
af_gene_df = pd.read_table(af_bed)

In [None]:
af_genes:list = af_gene_df['name'].tolist()

# Read and concatenate result TSV as master pd.DataFrame

In [None]:
def read_result_tsv(tsv:Path):
    df = pd.read_table(tsv, na_values=".", low_memory=False)
    df['CPRA'] = df.apply(lambda r: f"{r.CHROM}-{r.POS}-{r.REF}-{r.ALT}", axis=1)
    df['participant_id'] = tsv.name.split(".")[0]
    clinvar_df = df.loc[~df['clinvar.vcf.gz'].isna(), :].copy()
    return clinvar_df.astype({'clinvar.vcf.gz': int})

In [None]:
group_df_ls = list()
for working_tsv_ls in tqdm(chunk_tsv_ls):
    # Read TSV in parallel
    pool = multiprocessing.Pool(processes=40)    # Create Pool of 10 parallel processes max
    results = pool.starmap(read_result_tsv, [(tsv,) for tsv in working_tsv_ls])    # If the function only takes one arguments
    pool.close()    # Wait until all processes completed
    pool.join()    # Join all output from processes

    # Concatenate TSV df
    result_df = pd.concat(results)
    group_df = result_df.groupby('CPRA').agg(list)

    # Append to group_df_ls
    group_df_ls.append(group_df)

In [None]:
# Concatenate the grouped df, final groupby to get a master_df
af_df = pd.concat(group_df_ls).groupby('CPRA').agg(list)

In [None]:
af_df.rename(columns={
    "clinvar.vcf.gz": "clinVar_ID", 
    "clinvar.vcf.gz_CLNSIG": "CLNSIG", 
    "clinvar.vcf.gz_CLNREVSTAT": "CLNREVSTAT"
}, inplace=True)

In [None]:
# Collapse the nested list of variant-specific columns
variant_cols:list = [
    "CHROM", "POS", "REF", "ALT", "Gene", 
    "HGVSc", "HGVSp", "Consequence", "REVEL", "SpliceAI", 
    "clinVar_ID", "CLNSIG", "CLNREVSTAT"
]
individual_cols:list = ["participant_id", "GT", "AD"]

In [None]:
for col in variant_cols:
    af_df[col] = af_df[col].apply(lambda x: x[0][0])

In [None]:
for col in individual_cols:
    af_df[col] = af_df[col].apply(lambda c: list(chain.from_iterable(c)))

In [None]:
# Subset for variants which VEP annotated as af v3.2 genes
print(f"No. of variants before filtering: {len(af_df)}")
af_df = af_df.loc[af_df['Gene'].isin(af_genes), :].copy()
print(f"No. of variants after filtering: {len(af_df)}")

# Label CLINSIG and No. of gold stars

In [None]:
# Map clinvar.vcf.gz_CLNSIG to clinical_relevance
def map_clinical_relevance(clnsig):
    map_d = {
        "Uncertain_significance": "vus", 
        "Conflicting_classifications_of_pathogenicity": "vus", 
        "Likely_benign": "likely_benign", 
        "Benign/Likely_benign": "likely_benign", 
        "Likely_pathogenic": "likely_pathogenic", 
        "Pathogenic/Likely_pathogenic": "likely_pathogenic", 
        "Pathogenic": "pathogenic", 
        "Benign": "benign", 
    }
    if clnsig in map_d.keys():
        return map_d[clnsig]
    else:
        return "other"
af_df['clinical_relevance'] = af_df['CLNSIG'].apply(map_clinical_relevance)

In [None]:
def classify_goldstar(clnrevstat:str)->str:
    map_d = {
        'practice_guideline': 4, 
        'reviewed_by_expert_panel': 3, 
        'criteria_provided&_multiple_submitters&_no_conflicts': 2, 
        'criteria_provided&_conflicting_classifications': 1, 
        'criteria_provided&_single_submitter': 1, 
        'no_assertion_criteria_provided': 0, 
        'no_classification_provided': 0, 
        'no_classification_for_the_single_variant': 0
    }
    return map_d[clnrevstat]
af_df['goldstars_n'] = af_df['CLNREVSTAT'].apply(classify_goldstar)

# Calculate the AF, het_n & hom_n, hem_n

In [None]:
sample_info_df = pd.read_csv(sample_info_csv)
sample_info_df.set_index("sre_participant_id", inplace=True)

In [None]:
# Get count of Male and Feamle in the cohort
sex_cnt = dict(sample_info_df.loc[cohort_founder_df['participant_id'], 'inferred_sex'].value_counts())

In [None]:
af_df['het_n'] = af_df['GT'].apply(lambda g: g.count('0/1') + g.count('0|1') + g.count('1/0') + g.count('1|0'))
af_df['hom_n'] = af_df['GT'].apply(lambda g: g.count('1/1') + g.count('1|1'))
af_df['hem_n'] = af_df['GT'].apply(lambda g: g.count('1'))
af_df['AN'] = af_df.apply(lambda r: r.het_n + r.hom_n*2 + r.hem_n, axis=1)
af_df['AC'] = af_df.apply(
    lambda r: 2*sex_cnt['Female']+1*sex_cnt['Male'] if r['CHROM']=='chrX' else 2*sex_cnt['Female']+2*sex_cnt['Male'], axis=1
)
af_df['AF'] = af_df.apply(lambda r: r.AN/r.AC, axis=1)

# Classify the prevalence

In [None]:
# Classify variants into Private, Rare or Common
def classify_prevalence(participant_id:list, AF:float)->str:
    if len(participant_id) == 1:
        return 'private'
    elif AF <= 0.01:
        return 'rare'
    else:
        return 'common'
af_df['prevalence'] = af_df.apply(lambda r: classify_prevalence(r.participant_id, r.AF), axis=1)

# Export to TSV

In [None]:
reorder_cols:list = [
    'CHROM', 'POS', 'REF', 'ALT', 'Gene', 
    'HGVSc', 'HGVSp', 'Consequence', 'REVEL', 'SpliceAI', 
    'clinVar_ID', 'clinical_relevance', 'goldstars_n', 
    'het_n', 'hom_n', 'hem_n', 'AN', 'AC', 'AF', 'prevalence', 
    'participant_id', 'GT', 'AD', 
]
af_df = af_df.loc[:, reorder_cols].copy()

In [None]:
for col in ['participant_id', 'GT', 'AD']:
    af_df[col] = af_df[col].apply(lambda x: ".".join(x))

In [None]:
export_df = af_df.copy()
export_df.to_csv(out_tsv, index=True, index_label='CPRA', sep='\t')

# Export to VCF

In [None]:
export_vcf_df = af_df.fillna(".")

In [None]:
# Get VCF column fields ready
export_vcf_df.rename(columns={
    "CHROM": "#CHROM", 
    "clinVar_ID": "ID", 
    "het_n": "N_Het", 
    "hom_n": "N_Hom", 
    "hem_n": "N_Hem"
}, inplace=True)

export_vcf_df['N_All'] = export_vcf_df['participant_id'].apply(lambda x: len(x.split(".")))
export_vcf_df['NOTE'] = export_vcf_df.apply(lambda r: f"{r.participant_id}-{r.GT}-{r.AD}", axis=1)

export_vcf_df['QUAL'] = "."
export_vcf_df['FILTER'] = export_vcf_df['prevalence'].apply(lambda x: "PRIVATE" if x=="private" else "NON_PRIVATE")
export_vcf_df['FORMAT'] = "GT:AD"
export_vcf_df['FIRST_SAMPLE'] = export_vcf_df.apply(
    lambda r: f"{r.GT.split('.')[0]}:{r.AD.split('.')[0]}", axis=1
)
export_vcf_df['INFO'] = export_vcf_df['Gene'].apply(lambda x: f"Gene={x}")
for col in ['Consequence', 'HGVSc', 'HGVSp', 'REVEL', 'SpliceAI', 'clinical_relevance', 'goldstars_n', 'prevalence', 'N_Het', 'N_Hom', 'N_Hem', 'N_All', 'AF', 'NOTE']:
    export_vcf_df['INFO'] = export_vcf_df.apply(
        lambda r: f"{r.INFO};{col}={r[col]}", axis=1
    )

In [None]:
vcf_header_cols:list = [
    "#CHROM", "POS", "ID", "REF", "ALT", 
    "QUAL", "FILTER", "INFO", "FORMAT", "FIRST_SAMPLE"
]
export_vcf_df = export_vcf_df.loc[:, vcf_header_cols]

In [None]:
# Export to VCF
shutil.copy(vcf_headers, out_vcf)    # Add VCF header
export_vcf_df.to_csv(out_vcf, index=False, mode='a', sep='\t')