In [None]:
# Set the output bucket to write to, dataproc service account must have write access
# Do not include trailing slash or "gs://"
output_bucket = "clingen-dataproc-workspace-kferrite"
# Set the TSV path to write into bucket. Can contain slash like "folder/file.tsv"
# Do not include leading slash
report_filename = "clinvar-annotation-af-revstatus.tsv"


In [None]:
import hail as hl
# `idempontent=True` is useful for running all cells in the notebook
hl.init(idempotent=True)

In [None]:
# utility functions for file placement
import subprocess
import os
import time, datetime

def run_args(args, fail_on_stderr=False, success_codes=[0]) -> tuple: # (stdout,stderr,returncode)
    print(args)
    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = p.communicate()
    if (fail_on_stderr and len(stderr) > 0) or (p.returncode not in success_codes):
        raise RuntimeError("command {} failed with code {}:{}".format(
            args, p.returncode, stderr))
    return (stdout, stderr, p.returncode)

def local_to_bucket(local_path:str, gcs_path:str):
    if not gcs_path.startswith("gs://"):
        gcs_path = "gs://{}/{}".format(output_bucket, gcs_path)
    args = ["gsutil", "cp", local_path, gcs_path]
    run_args(args)
    
def bucket_to_local(gcs_path:str, local_path:str):
    if not gcs_path.startswith("gs://"):
        gcs_path = "gs://{}/{}".format(output_bucket, gcs_path)
    args = ["gsutil", "cp", gcs_path, local_path]
    run_args(args)
    
def local_to_hdfs(local_path:str, hdfs_path:str):
    if not local_path.startswith("/"):
        local_path = os.path.join(os.getcwd(), local_path)
    args = ["hdfs", "dfs", "-rm", hdfs_path]
    run_args(args, success_codes=[0,1]) # Allow error
    args = ["hadoop", "fs", "-cp", "file://" + local_path, hdfs_path]
#     args = ["hdfs", "dfs", "-cp", "file://" + local_path, hdfs_path]
    run_args(args)
    
def hdfs_to_local(hdfs_path:str, local_path:str):
    if os.path.exists(local_path):
        os.remove(local_path)
    args = ["hdfs", "dfs", "-cp", hdfs_path, "file://" + local_path]
    run_args(args)

In [None]:
# Obtain desired thresholds
import io, re
import pandas as pd
import numpy as np

input_filename = "input_files/sheet3.tsv"
bucket_to_local(input_filename, input_filename)
# print(os.path.isfile(input_filename))
# local_to_hdfs(input_filename, input_filename)

pd_df = pd.read_csv(input_filename, sep='\t', dtype=str)
pd_df = pd_df.astype(object).replace(np.nan, None)
input_ds = hl.Table.from_pandas(pd_df)
input_ds = input_ds.persist()
input_ds.describe()
input_ds.show()
# with open(input_filename) as f_in:
#     variation_ids = [line.strip() for line in f_in if len(line) > 0]
print("Loaded {} variations".format(input_ds.count()))
# print(df)

In [None]:
import io
import re

# Read gnomAD data as Hail Tables
# ds_exomes = hl.read_table(
#     "gs://gcp-public-data--gnomad/release/2.1.1/ht/exomes/gnomad.exomes.r2.1.1.sites.ht")
# ds_exomes = ds_exomes.annotate(
#     source="gnomAD Exomes")

# ds_genomes = hl.read_table(
#     "gs://gcp-public-data--gnomad/release/2.1.1/ht/genomes/gnomad.genomes.r2.1.1.sites.ht")
ds_genomes = hl.read_table(
    "gs://gcp-public-data--gnomad/release/3.1/ht/genomes/gnomad.genomes.v3.1.sites.ht")
# ds_genomes = ds_genomes.annotate(
#     source="gnomAD Genomes")

# Can perform a union here if wanting both (ds = ds1.union(ds2))
def select_necessary_cols(ds):
    ds = ds.select(ds.freq, ds.faf, ds.vep)
    return ds

# gnomad_exomes = select_necessary_cols(ds_exomes)
gnomad_genomes = select_necessary_cols(ds_genomes)

# gnomad = gnomad_genomes.union(gnomad_exomes, unify=True)
gnomad = gnomad_genomes

# Persist to disk the filtered table and reload
# gnomad = gnomad.checkpoint("hail-checkpoints/gnomad.genomes.v3.1.sites.ht")
# gnomad = gnomad.persist()

In [None]:
gnomad.describe()
# gnomad.show()
# If exists in genomes and exomes, pick genome
# gnomad_locus_counts = (gnomad.group_by(gnomad.locus)
#                             .aggregate(count=hl.agg.count()))
# gnomad_locus_counts.show()

# gnomad = gnomad.annotate(
#     locus_count=gnomad.filter(gnomad.locus==gnomad.locus).count()
# )


# gnomad = gnomad.filter(
#     (gnomad.source == hl.str("gnomAD Exomes")) & (gnomad.filter(gnomad.locus==gnomad.locus).count() > 1)
#     #hl.any(lambda genome: genome.locus == gnomad.locus, gnomad_genomes)

# )

# gnomad.show()

In [None]:
# Each correponds to a filter func for a (k,v) of faf label to value
GNOMAD_SUPERPOP = lambda t: re.match("^[a-zA-Z]+-adj$", t[0])
ANY = lambda t: True

# By default, filter to superpopulations aggregate faf
def faf_filter(faf_idx_tuple:tuple):
    return GNOMAD_SUPERPOP(faf_idx_tuple)

print(hl.eval(gnomad.globals.faf_index_dict))

list(filter(faf_filter, [(k,v) for k,v in hl.eval(gnomad.globals.faf_index_dict).items()]))

In [None]:
"""
ds.freq has raw frequency information, including AN, AC, and pop label. This is an array of 
structs, at indices determined by the categories in ds.globals.freq_index_dict

ds.faf has filtered allele frequency information, including confidence intervals faf95 adn faf99.
This is an array of structs, at indices determined by the category map in ds.globals.faf_index_dict
"""

def add_popmax_af(ds):
    """
    Adds a popmax_faf and popmax_af_pop column to the ds Hail Table containing gnomAD fields.
    
    popmax_faf is a faf structure from the original ds, containing the maximum faf of the
    listed faf structures in the original ds, based on the filtering criteria 
    `default_faf_filter_type`. 
    
    The popmax_index_dict_key column contains the text field from the
    ds.globals.faf_index_dict which corresponds to each popmax_faf. This is similar to the
    ds.popmax_faf.meta["pop"] value but not exactly the same (gnomad_afr vs afr)
    
    Returns the updated ds.
    """
    # Identify indices in FAF field that correspond to the entire dataset (not a subset like non-cancer)
    # faf_index_map = [(k,v) for k, v in hl.eval(ds.globals.faf_index_dict).items() if k.startswith("gnomad_")]
    # Each correponds to a filter func for a (k,v) of faf label to value
    GNOMAD_v3_1_SUPERPOP = lambda t: re.match("^[a-zA-Z]+-adj$", t[0])
    ANY = lambda t: True

    # By default, filter to superpopulations aggregate faf
    def faf_filter(faf_idx_tuple:tuple):
        return GNOMAD_v3_1_SUPERPOP(faf_idx_tuple)

    print(hl.eval(gnomad.globals.faf_index_dict))

    # Get list of the global faf_index_dict which meets the default_faf_filter criteria
    # This gives the indices of the desired populations, by default will take all top level populations
    faf_index_map = list(filter(faf_filter, [(k,v) for k,v in hl.eval(ds.globals.faf_index_dict).items()]))
    faf_indices = [v for k,v in faf_index_map]
    faf_labels = [k for k,v in faf_index_map]
    
    # Annotate table with popmax FAF
    
    # This only will return the maximum pop FAF for each
    # variant, even if multiple populations meet the criteria. 
    # If we want all matching populations, need an explode() call
    # to flatten the pop FAFs into a record per pop per variant
    
    ds = ds.annotate(
        # AF popmax meta.pop, faf95, faf99
        popmax_faf=hl.sorted(
            # Take only the FAF entries that correspond to the desired populations (faf_indices)
            hl.literal(faf_indices).map(lambda i: ds.faf[i]),
            # Sort by 95% confidence FAF
            lambda faf_entry: faf_entry.faf95,
            # Sort high to low
            reverse=True
        )[0] # Take the first entry with the highest FAF
        ,
        # Label of the freq_index_dict entry for this record's max pop
        popmax_index_dict_key=hl.sorted(
            # List of tuples of (poplabel, faf_index)
            list(zip(list(faf_labels), list(faf_indices))),

            # Take only the FAF entries that correspond to the entire dataset
            # Sort by 95% confidence FAF
            key=lambda tpl: ds.faf[tpl[1]].faf95,
            # Sort high to low
            reverse=True
        )[0][0] # Take the first entry, which has the highest FAF
    )
    
    ds = ds.annotate(
#         popmax_faf_pop_freq=ds.freq[ds.globals.freq_index_dict["gnomad_" + ds.popmax_faf.meta.get("pop")]]

        # ds.globals.freq_index_dict uses the same keys as ds.globals.faf_index_dict so
        # we can reuse ds.popmax_index_dict_key created above
        # Frequency info about the AF popmax population (AC, AF, AN, homozygote_count)
        popmax_faf_pop_freq=ds.freq[ds.globals.freq_index_dict[ds.popmax_index_dict_key]] 
    )
    
    return ds


# gnomad_with_popmax = add_popmax_af(gnomad)
# gnomad_with_popmax.show()

In [None]:
# Import ClinVar VCF as Hail Table
# clinvar = hl.import_vcf("/path/to/clinvar.vcf.gz", force_bgz=True, drop_samples=True, skip_invalid_loci=True).rows()

# Download clinvar BGZF
import os, requests, subprocess
import gzip

# Function to download a file to a localpath. ClinVar VCF is small enough to download to dataproc default local disk.
def download_to_file(url, filepath):
    r = requests.get(url, stream=True)
    if r.status_code != 200:
        raise RuntimeError("Failed to obtain ClinVar VCF:{}\n{}".format(r.status_code))
    with open(filepath, "wb") as fout: 
        for chunk in r.iter_content(chunk_size=1024): 
             if chunk:
                 fout.write(chunk)
                
def add_chr_to_vcf_contig(filepath:str) -> str:
    """
    Takes vcf.gz file path, replaces contigs with chr+contig,
    returns new vcf.gz file path.
    """
    output_filepath = filepath.replace(".gz", ".1.gz")
    with gzip.open(filepath, "rb") as f_in:
        with gzip.open(output_filepath, "wb") as f_out:
            for line in f_in:
                line = line.decode("utf-8")
                terms = line.split("\t")
                if terms[0] in ([str(i) for i in range(1,23)] + ['X','Y']):
                    terms[0] = "chr" + terms[0]
                line = "\t".join(terms)
                if not line.endswith("\n"):
                    line = line + "\n"
                f_out.write(line.encode("utf-8"))
    return output_filepath
                
                
# This url always points to the latest dump file, updated periodically by ClinVar
clinvar_vcf_url = "https://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh38/clinvar.vcf.gz"
clinvar_vcf_localpath = "/home/hail/clinvar.vcf.gz"
clinvar_vcf_hdfs = "clinvar.vcf"
download_to_file(clinvar_vcf_url, clinvar_vcf_localpath)
assert(os.path.exists(clinvar_vcf_localpath))
print("Downloaded ClinVar VCF, file size (expecting ~30M): %d" % os.path.getsize(clinvar_vcf_localpath))

print("Updating contigs")
clinvar_fixed_localpath = add_chr_to_vcf_contig(clinvar_vcf_localpath)
# Unzip it because no bgzip command here to put in blocked gzip
def unzip(filepath:str) -> str:
    assert(filepath.endswith(".gz"))
    output_filepath = filepath[0:-3]
    with gzip.open(filepath, "rb") as f_in:
        with open(output_filepath, "wb") as f_out:
            for line in f_in:
                f_out.write(line)
    return output_filepath
clinvar_unzipped_fixed_localpath = unzip(clinvar_fixed_localpath)



# Hail needs the file in HDFS
local_to_hdfs(clinvar_unzipped_fixed_localpath, clinvar_vcf_hdfs)
# p = subprocess.Popen(["hdfs", "dfs", "-cp", "file://" + clinvar_vcf_localpath, clinvar_vcf_hdfs])
# print(p.communicate())

clinvar = hl.import_vcf(
    clinvar_vcf_hdfs,
#     force_bgz=True,
    drop_samples=True, 
    skip_invalid_loci=True,
    reference_genome="GRCh38"
).rows()
print("Imported {} records from ClinVar".format(clinvar.count()))


In [None]:
# Filter clinvar to those in input set
input_variation_ids = input_ds.clinvar_variation_id.collect()
clinvar = clinvar.filter(
    # Return true if clinvar.rsid value matches any input variation id 
    hl.any(lambda rec: rec == clinvar.rsid, input_variation_ids)
)
print("ClinVar filtered to {} rows".format(clinvar.count()))

In [None]:
clinvar.describe()
clinvar = clinvar.select(
    clinvar.rsid,
    info=hl.struct(
        CLNSIG=clinvar.info.CLNSIG,
        CLNSIGCONF=clinvar.info.CLNSIGCONF,
        CLNREVSTAT=clinvar.info.CLNREVSTAT,
        CLNDN=clinvar.info.CLNDN
    )
)
clinvar.describe()

In [None]:
input_clinvar = input_ds.key_by("clinvar_variation_id")
input_clinvar = input_clinvar.annotate(
    clinvar=clinvar.key_by("rsid")[input_clinvar.clinvar_variation_id]
)

print("Number of partitions originally: {}".format(input_clinvar.n_partitions()))
input_clinvar = input_clinvar.key_by(input_clinvar.clinvar.locus, input_clinvar.clinvar.alleles)
input_clinvar = input_clinvar.repartition(64, shuffle=True)
input_clinvar = input_clinvar.checkpoint("hail-checkpoints/input_clinvar.ht", overwrite=True)

In [None]:


# import time
# print("Persisting clinvar")
# start = time.time()
# clinvar_persistent = clinvar.persist()
# # gnomad_persistent = gnomad.persist()
# end = time.time()
# print("Persisting took {} seconds".format((end - start)))


# print("Checkpointing clinvar")
# start = time.time()
# clinvar_persistent = clinvar_persistent.checkpoint("hail-checkpoints/clinvar.ht", overwrite=True)
# end = time.time()
# print("Checkpointing took {} seconds".format((end - start)))


In [None]:
# gnomad_persistent.describe()
# gnomad_filtered = gnomad_persistent.filter(
# #     clinvar.any(clinvar.locus == gnomad_persistent.locus & clinvar.alleles = gnomad_persistent.alleles)
#     hl.is_defined(clinvar_persistent[gnomad_persistent.locus, gnomad_persistent.alleles])
# )

# gnomad_filtered_count = gnomad_filtered.count()
# print("Gnomad filtered to {} rows".format(gnomad_filtered_count))

In [None]:
import time
# Combine gnomad and clinvar
print("Left joining gnomad to input_clinvar")
# start = time.time()
# gnomad_clinvar = gnomad_filtered.join(clinvar_persistent, how="right") # take only those from clinvar
input_clinvar_gnomad = input_clinvar.annotate(
    gnomad=gnomad[input_clinvar.clinvar.locus, input_clinvar.clinvar.alleles]
)
# count = gnomad_clinvar.count()
# end = time.time()
# print("Counting took {} seconds".format((end - start)))
# print("Gnomad/Clinvar combined to {} rows".format(count))

In [None]:
# input_annotated = input_ds.key_by("clinvar_variation_id")
# clinvar_by_id = clinvar_persistent.key_by("rsid").persist()
# input_annotated = input_annotated.annotate(
#     clinvar=clinvar_by_id[input_annotated.clinvar_variation_id]
# )

# input_annotated = input_annotated.annotate(
#     gnomad=gnomad_filtered[input_annotated.clinvar.locus, input_annotated.clinvar.alleles]
# )

# input_annotated = input_annotated.annotate(
#     locus=input_annotated.clinvar.locus,
#     alleles=input_annotated.clinvar.alleles
# )

# input_annotated = input_annotated.key_by("locus", "alleles")

In [None]:
# END MODIFICATIONS

In [None]:
# import time
# print("Persisting gnomad_clinvar to memory and disk")
# start = time.time()
# gnomad_clinvar_persisted = gnomad_clinvar.persist()
# end = time.time()
# print("Persisting gnomad_clinvar took {} seconds".format((end - start)))

In [None]:
# Filter gnomad to records in input_ds
# input_clinvar = input_ds.annotate(
#     clinvar=clinvar.key_by(clinvar.rsid)[input_ds.clinvar_variation_id]
# )
# input_clinvar = input_clinvar.key_by(
#     input_clinvar.clinvar.locus, input_clinvar.clinvar.alleles
# )
# input_clinvar.describe()
# input_clinvar.show()

In [None]:
import datetime
print(datetime.datetime.now())

In [None]:
with_popmax = input_clinvar_gnomad

with_popmax.describe()
# Add freq + faf global annotations from gnomad to new table
with_popmax = with_popmax.annotate_globals(freq_index_dict=hl.eval(gnomad.globals.freq_index_dict))
with_popmax = with_popmax.annotate_globals(faf_index_dict=hl.eval(gnomad.globals.faf_index_dict))
# Pull gnomad faf and freq to top level
with_popmax = with_popmax.annotate(faf=with_popmax.gnomad.faf)
with_popmax = with_popmax.annotate(freq=with_popmax.gnomad.freq)

# with_popmax = with_popmax.annotate_globals(freq_index_dict=)

with_popmax.describe()

# Compute popmax and annotate
with_popmax = add_popmax_af(with_popmax)

In [None]:
with_popmax.describe()

In [None]:
# list(with_popmax.row.keys())

In [None]:
# with_popmax.key.collect()
input_table_columns = list(input_ds.row.keys())
print(input_table_columns)

In [None]:
output_ds = with_popmax
# Query	CA ID	ClinVar Name	ClinVar ID	37 Coorindates	38 Coordinates	Gene	NM	NP	clinvar_variation_id
output_ds = output_ds.annotate(
    # already keyed by clinvar_variation_id
#     clinvar_variation_id=output_ds.clinvar_variation_id,
    clinvar_review_status=hl.delimit(output_ds.clinvar.info["CLNREVSTAT"], ","),
    clinvar_significance=hl.delimit(output_ds.clinvar.info["CLNSIG"], ","),
    clinvar_significance_interpretations=hl.delimit(output_ds.clinvar.info["CLNSIGCONF"], ","),
    # Hail parses the CLNDN (and related like CLNDNINCL) incorrectly
    # Since ',' is allowed in condition names, ClinVar uses '|' to separate them
    # But Hail separates into an array based on ',' instead of '|'
    # If we re-join the string with ',' it will match that from ClinVar
    clinvar_conditions=hl.delimit(output_ds.clinvar.info["CLNDN"], ","),
#     popmax_pop = output_ds.popmax_faf.meta["pop"],
    popmax_pop = output_ds.popmax_index_dict_key,
    popmax_ac = output_ds.popmax_faf_pop_freq.AC,
    popmax_an = output_ds.popmax_faf_pop_freq.AN,
)
output_ds = output_ds.key_by("locus", "alleles")
output_ds = output_ds.select(
    *(input_table_columns + 
      ['clinvar_review_status', 
      'clinvar_significance',
      'clinvar_significance_interpretations',
      'clinvar_conditions',
      'popmax_pop',
      'popmax_ac',
      'popmax_an'])
)
output_ds.describe()


In [None]:
print("Output table number of partitions: {}".format(output_ds.n_partitions()))

In [None]:
# Select desired output fields (columns are ordered as provided)
# output_ds = with_popmax
# # Query	CA ID	ClinVar Name	ClinVar ID	37 Coorindates	38 Coordinates	Gene	NM	NP	clinvar_variation_id
# output_ds = output_ds.select(
#     # already keyed by clinvar_variation_id
#     clinvar_review_status=hl.delimit(output_ds.clinvar.info["CLNREVSTAT"], ","),
#     clinvar_significance=hl.delimit(output_ds.clinvar.info["CLNSIG"], ","),
#     clinvar_significance_interpretations=hl.delimit(output_ds.clinvar.info["CLNSIGCONF"], ","),
#     # Hail parses the CLNDN (and related like CLNDNINCL) incorrectly
#     # Since ',' is allowed in condition names, ClinVar uses '|' to separate them
#     # But Hail separates into an array based on ',' instead of '|'
#     # If we re-join the string with ',' it will match that from ClinVar
#     clinvar_conditions=hl.delimit(output_ds.clinvar.info["CLNDN"], ","),
    
#     # gnomad popmax things
#     popmax_pop = output_ds.popmax_faf.meta["pop"],
#     popmax_ac = output_ds.popmax_faf_pop_freq.AC,
#     popmax_an = output_ds.popmax_faf_pop_freq.AN,
# )

# output_ds = output_ds.order_by(
#     hl.int(output_ds.clinvar_variation_id) # Assume all clinvar variation ids are integers
# )

# output_ds.describe()

# Export to TSV
import time
print("Starting export to %s" % report_filename)
start_time = time.time()
output_ds.export(report_filename)
end_time = time.time()
print("Export took %.2f seconds" % (end_time - start_time))

In [None]:
# The export is in HDFS now, copy to machine-local file
report_localpath = os.path.join(os.getcwd(), report_filename)
hdfs_to_local(report_filename, report_localpath)

In [None]:
# Upload to bucket and filepath set at top of notebook
print("Uploading {} bytes to GCS".format(os.path.getsize(report_localpath)))
local_to_bucket(report_localpath, report_filename)