In [15]:
import os
import pandas as pd
import numpy as np
import logging
from tqdm import tqdm
import subprocess
import vcfpy

In [16]:
data_path = "/project/datacamp/team11/data"
output_path = os.path.join(data_path, "preprocessed")
sample_annotation_file = os.path.join(data_path, "igsr-1000 genomes 30x on grch38.tsv")
chr_list = [str(x) for x in range(1,23)]
gt_dict = {"0|0" :0, "0|1" : 1, "1|0" : 2, "1|1" : 3 } # genotype dict for converting string-> inteter 

if not os.path.exists(output_path):
    os.makedirs(output_path)

In [17]:
def count_vcf_line_count(filename):
    cmd = f"grep -v '^#' {filename} | wc -l"
    result = subprocess.run(cmd, shell=True, text=True, capture_output=True)
    return int(result.stdout.strip())

In [18]:
def preprocess_vcf(vcf_file_name, sample_annotation_file, output_file_prefix):
    vcf_line_count = count_vcf_line_count(vcf_file_name)
    print(f"----- processing {vcf_file_name} file with {vcf_line_count} variants -----")

    reader = vcfpy.Reader.from_path(vcf_file_name)
    sample_names_from_vcf = reader.header.samples.names

    gt_mat_raw = np.empty((vcf_line_count, len(sample_names_from_vcf)), dtype=np.int8)
    gt_mat_raw.fill(0)

    variant_info_header = ['CHROM', 'POS', 'REF', 'ALT']
    variant_info_df_raw = pd.DataFrame(index = range(vcf_line_count), columns = variant_info_header, dtype = str)

    status_counter = {
        "total_variant" : 0,
        "total_snps" : 0,
        "total_meaningful_snps" : 0
    }

    for record in tqdm(reader, total = vcf_line_count):
        status_counter["total_variant"] += 1
        if not record.is_snv():
            continue
        status_counter["total_snps"] += 1

        gt_int = [gt_dict[call.data.get('GT')] if call.data.get('GT') in gt_dict else len(gt_dict) for call in record.calls]
        if all(v == 0 for v in gt_int):
            continue
        status_counter["total_meaningful_snps"] += 1

        assert len(gt_int) == gt_mat_raw.shape[1]
        gt_mat_raw[status_counter["total_meaningful_snps"]-1, :] = gt_int

        variant_info = [record.CHROM, str(record.POS), record.REF] + [alt.value for alt in record.ALT]
        variant_info_df_raw.iloc[status_counter["total_meaningful_snps"]-1] = variant_info
        
    reader.close()

    assert(status_counter["total_variant"] == vcf_line_count)
    print(status_counter)

    sample_annotation = pd.read_csv(sample_annotation_file, sep="\t")
    sample_name_to_idx = {name : idx for idx, name in enumerate(sample_names_from_vcf)}
    indices_for_sort_sample = [sample_name_to_idx[name] for name in sample_annotation["Sample name"]]

    gt_mat = gt_mat_raw[:status_counter["total_meaningful_snps"], :].transpose()[indices_for_sort_sample,:]
    #variant_info_df = pd.DataFrame(variant_info_list, columns=variant_info_header)
    variant_info_df = variant_info_df_raw.iloc[:status_counter["total_meaningful_snps"]]


    numpy_save_file_name = f"{output_file_prefix}_matrix.npy"
    pandas_save_file_name = f"{output_file_prefix}_variant.csv"
    print(f"genotype matrix shape (#samples, #features) : {gt_mat.shape} -> saved to {numpy_save_file_name}")
    print(f"sample annotations (#samples, ) : {sample_annotation.shape}")
    print(f"variant info dataframe (#features, ): {variant_info_df.shape} -> saved to {pandas_save_file_name}")

    np.save(numpy_save_file_name, gt_mat)
    variant_info_df.to_csv(pandas_save_file_name, sep=",", index = False)

### Read each vcf file and convert to matrix format

In [19]:
## inject a test code
#preprocess_vcf("/home/jinhyun/data/1kGP/temp.vcf", 
#               os.path.join(data_path, "temp_igsr.tsv"), 
#               os.path.join(output_path, "chr_test"))
    
#preprocess_vcf(os.path.join(data_path, "test.vcf"), 
#               sample_annotation_file, 
#               os.path.join(output_path, "chrT"))

----- processing /project/datacamp/team11/data/test.vcf file with 4889 variants -----


100%|██████████| 4889/4889 [00:46<00:00, 105.06it/s]


{'total_variant': 4889, 'total_snps': 4243, 'total_meaningful_snps': 4243}
genotype matrix shape (#samples, #features) : (3202, 4243) -> saved to /project/datacamp/team11/data/preprocessed/chrT_matrix.npy
sample annotations (#samples, ) : (3202, 9)
variant info dataframe (#features, ): (4243, 4) -> saved to /project/datacamp/team11/data/preprocessed/chrT_variant.csv


In [None]:
for chr in chr_list[::-1]:
    vcf_file_name = os.path.join(data_path, f"1kGP_high_coverage_Illumina.chr{chr}.filtered.SNV_INDEL_SV_phased_panel.vcf")
    if not os.path.exists(vcf_file_name):
        logging.warning(f"can not find vcf file for chromosome {chr}")
        continue

    preprocess_vcf(vcf_file_name, 
        sample_annotation_file, 
        os.path.join(output_path, f"chr{chr}"))