## This is a filtering script using VEP - dbSNP and ClinVar
This script should be run after variant calling with DeepVariant and annotation with VEP

In [None]:
# import packages
import sys
import glob
import os
from os import listdir
from os.path import isfile, join
import pysam 
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import matplotlib.pyplot as plt 
import scipy.sparse as sp
from scipy.sparse import csr_matrix 

In [None]:
# define the base directory
base_dir = "/lustre/scratch126/casm/team274sb/lr26/"

In [None]:
### here are the main parsing functions which are analogous to the COSMIC annotation
# parsing of sample formats in case of multiple alleles
def parse_sample_format(format_str, sample_str):
    """
    parses FORMAT and SAMPLE fields of the vcf into a dictionary
    handles numeric parsing and multiple alleles
    """
    keys = format_str.split(":")
    values = sample_str.split(":")
    sample_dict = dict(zip(keys, values))

    # parses genotype
    sample_dict["GT"] = sample_dict.get("GT", "")

    # parses depth, in case error - 0
    try:
        sample_dict["DP"] = int(sample_dict.get("DP", 0))
    except ValueError:
        sample_dict["DP"] = 0

    # parses allele depth
    ad_val = sample_dict.get("AD", "0")
    if "," in ad_val:
        try:
            ad_list = [int(a) for a in ad_val.split(",") if a.isdigit()]
            sample_dict["AD"] = sum(ad_list[1:])  # only alternative allele depths
        except:
            sample_dict["AD"] = 0
    else:
        sample_dict["AD"] = int(ad_val) if ad_val.isdigit() else 0

    # parses vafs by taking the maximal one else 0 and converts to floats
    vaf_val = sample_dict.get("VAF", "0.0")
    try:
        if "," in vaf_val:
            vaf_list = [float(v) for v in vaf_val.split(",") if v]
            sample_dict["VAF"] = max(vaf_list) if vaf_list else 0.0
        else:
            sample_dict["VAF"] = float(vaf_val)
    except:
        sample_dict["VAF"] = 0.0

    return sample_dict

# gets genotype based on whether the alleles are both 0, 1 or different each
# unknown if more than 2 alleles
def get_genotype_from_GT(gt_str):
    """
    Determines genotype type based on GT field
    """
    alleles = gt_str.replace('|', '/').split('/')
    if len(alleles) == 2:
        if alleles[0] == alleles[1]:
            return "homozygous"
        else:
            return "heterozygous"
    return "unknown"

# extracts genotype
def extract_gt(sample_str, format_str):
    keys = format_str.split(":")
    values = sample_str.split(":")
    format_dict = dict(zip(keys, values))
    gt = format_dict.get("GT", "")
    return get_genotype_from_GT(gt)

# filters variants by sample depth, allele depth and VAFs
def filter_variant_by_sample(sample_dict, min_dp=10, min_ad=3, min_vaf=0.1):
    """
    Apply basic quality filters on depth, allele depth, and VAF
    """
    dp = sample_dict.get("DP", 0)
    ad = sample_dict.get("AD", 0)
    vaf = sample_dict.get("VAF", 0.0)
    
    return (dp >= min_dp) and (ad >= min_ad) and (vaf >= min_vaf)

# then it gets vep annotations 
# here they have to match the order in which they are in the vep annotated vcf
def get_vep_annotations(info_str):
    info_list = info_str.split(";")
    info_dict = {}
    csq_field_name = "CSQ"

    csq_field_order = [
        "Allele", "Consequence", "IMPACT", "SYMBOL", "Gene", "Feature_type",
        "Feature", "BIOTYPE", "EXON", "INTRON", "HGVSc", "HGVSp", "cDNA_position",
        "CDS_position", "Protein_position", "Amino_acids", "Codons",
        "Existing_variation", "DISTANCE", "STRAND", "FLAGS", "SYMBOL_SOURCE",
        "HGNC_ID", "SOURCE", "ClinVar", "ClinVar_CLINSIG", "dbSNP", "dbSNP_ID",
        "gencode.sorted.gff3.gz"
    ]
    # all these annotations are extracted into the ann_dict annotation list
    annotations = []

    for entry in info_list:
        if "=" in entry:
            key, value = entry.split("=", 1)
            if key == csq_field_name:
                for ann in value.split(","):
                    fields = ann.split("|")
                    ann_dict = dict(zip(csq_field_order, fields))
                    annotations.append(ann_dict)

    return annotations

# impact list - extracts impact from the annotation list
def get_impact_list(ann_list):
    impact_list = []
    for annotation in ann_list:
        impact = annotation["IMPACT"]
        impact_list.append(impact)
    impact_list = list(set(impact_list))
    return impact_list

# gets relevant impact type, those that are moderate and high plus their containing fields are considered relevant
def get_impact_type(impact_list):
    relevant = ["MODERATE","HIGH"]
    impact_type = ""
    if (len(impact_list) != 0):
        for impact in impact_list:
            if (impact in(relevant)):
                impact_type = "relevant"
                break
            else:
                impact_type = "non-relevant"
    else:
        impact_type = "relevant"
    return impact_type

# then get the clinical annotation type and flag benigh and likely benign
def get_clin_type(ann_list):
    clin_sig_list = []
    for annotation in ann_list:
        clin_ann_list = annotation["ClinVar_CLINSIG"].split("&")
        for clin_sig in clin_ann_list:
            clin_ann_sub_list = clin_sig.split("/")
            for clin_sub_sig in clin_ann_sub_list:
                clin_sig_list.append(clin_sub_sig)
    clin_sig_list = set(clin_sig_list)
    benign_list = ["Benign","Likely_benign"]
    benign_list = set(benign_list)
    if (len(clin_sig_list.intersection(benign_list)) > 0):
        clin_type = "Benign"
    else:
        clin_type = list(clin_sig_list)
    return clin_type

# extracts those within gene
def extract_genes(annotations):
    return list(set(
        ann["SYMBOL"] for ann in annotations if ann.get("SYMBOL")
    ))
    
# extracts protein position
def extract_protein_positions(annotations):
    return [
        ann["Protein_position"] for ann in annotations if ann.get("Protein_position")
    ]

# extracts snp IDs
def extract_dbsnp_ids(annotations):
    return list(set(
        ann.get("dbSNP") or ann.get("rsid") for ann in annotations if ann.get("dbSNP") or ann.get("rsid")
    ))

# extracts clinvar consequence
def extract_consequences(annotations):
    return list(set(
        ann.get("Consequence") or ann.get("Conseq") for ann in annotations if ann.get("Consequence") or ann.get("Conseq")))

In [None]:
# load whichever decompressed vcf file
# there are 8 in total - 4x hg38 aligned (mom, blood, tumor whole, tumor somatic) and 4x T2T aligned (mom, blood, tumor whole, tumor somatic) 
df = pd.read_csv(base_dir + "PacBio-deepvariant-tumor-hg38/tumor_vep_annotated_with_clinvar_and_dbsnp_vcf_new_somatic_hg38.vcf",
                 sep="\t",
                 comment="#",
                 dtype="str",
                 header=None,
                 names=["chr","pos","id","ref","alt","qual","filter","info","format","sample"])
df

In [None]:
# do a copy of the dataframe
df_filt = df.copy()
df_filt["qual"] = pd.to_numeric(df_filt["qual"], errors='coerce')  # convert in the copy

# do some initial pre-filtering so that annotation would not be taking too long
df_filtered = df_filt[
    (df_filt["filter"] == "PASS") &
    (df_filt["qual"] > 5)
]

In [None]:
# extract sample info
df_filtered["sample_info"] = df_filtered.apply(lambda row: parse_sample_format(row["format"], row["sample"]), axis=1)

# add specific fields as separate columns - depth, allele depth, allele frequencies, genotypes
df_filtered["DP"]  = df_filtered["sample_info"].apply(lambda x: x["DP"])
df_filtered["AD"]  = df_filtered["sample_info"].apply(lambda x: x["AD"])
df_filtered["VAF"] = df_filtered["sample_info"].apply(lambda x: x["VAF"])
df_filtered["GT"]  = df_filtered["sample_info"].apply(lambda x: x["GT"])
df_filtered["genotype"] = df_filtered["GT"].apply(get_genotype_from_GT)

# apply flagged filtering
df_filtered["pass_sample_filters"] = df_filtered["sample_info"].apply(filter_variant_by_sample)

#  extract vep annotations as the dictionary
df_filtered["vep_ann"] = df_filtered["info"].apply(get_vep_annotations)

# get impact info and type
df_filtered["impact_list"] = df_filtered["vep_ann"].apply(get_impact_list)
df_filtered["impact_type"] = df_filtered["impact_list"].apply(get_impact_type)

# get clinical significance
df_filtered["clin_sig"] = df_filtered["vep_ann"].apply(get_clin_type)

# get genes, protein positions, dbsnp ids, consequences
df_filtered["genes"] = df_filtered["vep_ann"].apply(extract_genes)
df_filtered["protein_positions"] = df_filtered["vep_ann"].apply(extract_protein_positions)
df_filtered["dbsnp_ids"] = df_filtered["vep_ann"].apply(extract_dbsnp_ids)
df_filtered["consequences"] = df_filtered["vep_ann"].apply(extract_consequences)

In [None]:
# apply more stringent filtering - look at those that have a relevant impact type 
# and have some ClinVar annotation which is not considered Benign 
df_fin = df_filtered[
    (df_filtered["filter"] == "PASS") &
    (df_filtered["qual"] > 5) &
    (df_filtered["AD"] > 5) &
    (df_filtered["impact_type"] == "relevant") &
    (df_filtered["clin_sig"] != "Benign") &
    (df_filtered["clin_sig"].apply(lambda x: any(s.strip() for s in x if isinstance(s, str))))
]

In [None]:
# save the tsv to view in Excel
df_fin.to_csv(base_dir + "PacBio-deepvariant-tumor-hg38/tumor_vep_annotated_with_clinvar_and_dbsnp_vcf_new_somatic_hg38_filtered.tsv", sep="\t")

In [None]:
### Then this script can be run for all remaining samples