In [None]:
import hail as hl
hl.init(quiet=True, skip_logging_configuration=True)

In [None]:
hl.spark_context().setLogLevel('ERROR')

# Prepare VEP Input
- Read the VCF
- Subset variants to about 2500 (to be small enough for this demo)
- Export in Parallel (6 parts)

In [None]:
mt = hl.import_vcf('data/1kg.vcf.bgz')
mt.count()

In [None]:
mt = mt.sample_rows(0.03)
mt.count()

In [None]:
mt = mt.repartition(6)
hl.export_vcf(mt, 'results/parallel.vcf.bgz', parallel='header_per_shard')

# Run VEP for each VCF part in parallel

In [None]:
%%bash
for file in $(ls results/parallel.vcf.bgz/part-*.bgz)
do
    file=$(basename -- $file)
    file=${file%.*}
    echo $file
    vep --everything --fork 2 --format vcf --input_file results/parallel.vcf.bgz/$file.bgz --json --compress_output bgzip -output_file results/vep_jsons/$file.json.bgz --cache --offline --assembly GRCh37 --merged &
done
wait

# Fix JSON files for Hail and find schema of each part (in parallel)

In [None]:
%%bash
for file in $(ls -1 results/vep_jsons/part-*.bgz)
do
    file=$(basename -- $file)
    file=${file%.*}
    file=${file%.*}
    echo $file
    echo python ../src/process.py results/vep_jsons/$file.json.bgz results/hail_jsons/$file.json.bgz results/hail_jsons/$file.schema.pickle &
    python ../src/process.py results/vep_jsons/$file.json.bgz results/hail_jsons/$file.json.bgz results/hail_jsons/$file.schema.pickle &
done
wait

# Merge all schema together

In [None]:
%%bash
python ../src/combine.py "results/hail_jsons/*.schema.pickle" results/final_schema.txt

In [None]:
with open('results/final_schema.txt', 'r') as file:
    schema = file.read()
    print(schema)

# Load all the fixed JSONs in parallel

In [None]:
ht = hl.import_table('results/vep_jsons/part-*.json.bgz', no_header=True).cache()
ht.count()

In [None]:
ht.show()

# Parse Fixed VEP JSONs

In [None]:
ht = ht.transmute(vep = hl.parse_json(ht.f0, dtype=schema))
ht.describe()

In [None]:
ht.show()

In [None]:
def Flatten(ht, feildName):
    ht = ht.flatten()
    expr = {k:k.replace(f'{feildName}.','') for k in ht.row}
    ht = ht.rename(expr)
    return ht

In [None]:
def BreakList(ht, feildName, indexName):
    htKeys = list(ht.key)
    assert len(htKeys)==1, 'Table must have one key column'
    assert ht.count()==ht.distinct().count(), 'Table key must be unique for each row'
    htKey = htKeys[0]

    fu = ht.select(feildName)
    ht = ht.drop(feildName)
    fu = fu.explode(feildName)
    expr = {htKey: hl.agg.collect(fu[htKey])}
    fu = fu.group_by(fu[feildName]).aggregate(**expr)
    fu = fu.add_index(indexName)
    fu = fu.key_by(indexName)
    mm = fu.select(htKey)
    fu = fu.drop(htKey)
    mm = mm.explode(htKey)
    
    fu = Flatten(fu, feildName)
    fu = fu.key_by(indexName)
    
    return ht, fu, mm

In [None]:
ht = Flatten(ht, 'vep')
ht = ht.transmute(variant = hl.str(':').join([ht.seq_region_name, hl.str(ht.start), ht.allele_string.replace('/', ':')]))
ht = ht.key_by(ht.variant)

In [None]:
ht.describe()

In [None]:
ht, tc, tc_mm = BreakList(ht, 'transcript_consequences', 'tcId')
ht, ic, ic_mm = BreakList(ht, 'intergenic_consequences', 'icId')
ht, rc, rc_mm = BreakList(ht, 'regulatory_feature_consequences', 'rcId')
ht, mc, mc_mm = BreakList(ht, 'motif_feature_consequences', 'mcId')
ht, cv, cv_mm = BreakList(ht, 'colocated_variants', 'cvId')

In [None]:
tc.show()

In [None]:
tc, tc_ct, tc_ct_mm = BreakList(tc, 'consequence_terms', 'ctId')
ic, ic_ct, ic_ct_mm = BreakList(ic, 'consequence_terms', 'ctId')
rc, rc_ct, rc_ct_mm = BreakList(rc, 'consequence_terms', 'ctId')
mc, mc_ct, mc_ct_mm = BreakList(mc, 'consequence_terms', 'ctId')
mc, mc_tf, mc_tf_mm = BreakList(mc, 'transcription_factors', 'tfId')

In [None]:
mc.describe()