In [11]:
import time
import glob

import glow
spark = glow.register(spark)
import pyspark.sql.functions as fx
from pyspark.sql.types import *
from pyspark.sql.functions import *
from random import sample

In [2]:
vcf_path_glob = 'data/*20R.strelka.somatic.snvs.vcf.gz'
vcf_paths = glob.glob(vcf_path_glob)
vcf_path = vcf_paths[0]

def load_vcf(path, includeSampleIds=True):
    vcf = (
      spark
      .read
      .format("vcf")
      .load(path, includeSampleIds=includeSampleIds)
    )
    return vcf

def filter_pass(vcf):
    return vcf.filter(array_contains(col("filters"), 'PASS'))

def get_only_tumor_df(vcf, cols2keep=None):
    if cols2keep is None:
        cols2keep = [e for e in vcf.columns if e not in ['genotypes']]
    
    #
    # this is a proper way to explot the genotypes array but due to a bug? / Delta API changes? it does not work
    #vcf.select(element_at(col("genotypes"),1).alias('tumor')).select(glow.expand_struct("tumor"))
    tvcf = vcf.select(cols2keep + [vcf.genotypes.getItem(1).alias('tumor')])
    return tvcf.select(cols2keep + ["tumor.*"]).drop('sampleId')
    
def get_tumor_and_normal_id_from_strelka_somatic_vcf_path(path):
    extension = '.strelka.somatic.snvs.vcf.gz'
    tvsn = os.path.basename(path)[:-len(extension)]
    try:
        tsid,nsid = tvsn.split('_vs_')
        return (tsid,nsid)
    except ValueError:
        print("For tumor and normal ID inference the filename of Strelka somatic VCF "+
              "is expected to be [TUMOR_vs_NORMAL" + extension + "].\nIt was " + os.path.basename(path))
    
def load_strelka_somatic_vcf(path, tumor_sample_id=None, normal_sample_id=None):
    tsid,nsid = get_tumor_and_normal_id_from_strelka_somatic_vcf_path(path)
    if (tumor_sample_id is None):
        tumor_sample_id = tsid
    if (normal_sample_id is None):
        normal_sample_id = nsid
    
    vcf = load_vcf(path)
    fvcf = filter_pass(vcf)
    tvcf = get_only_tumor_df(fvcf)
    return tumor_sample_id, tvcf



## Load VCF

### Load single

In [3]:
%%time
vcf = load_vcf(vcf_path)
vcf.printSchema()

                                                                                

root
 |-- contigName: string (nullable = true)
 |-- start: long (nullable = true)
 |-- end: long (nullable = true)
 |-- names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- referenceAllele: string (nullable = true)
 |-- alternateAlleles: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- qual: double (nullable = true)
 |-- filters: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- splitFromMultiAllelic: boolean (nullable = true)
 |-- INFO_SomaticEVS: double (nullable = true)
 |-- INFO_QSS_NT: integer (nullable = true)
 |-- INFO_PNOISE2: double (nullable = true)
 |-- INFO_MQ0: integer (nullable = true)
 |-- INFO_ReadPosRankSum: double (nullable = true)
 |-- INFO_TQSS: integer (nullable = true)
 |-- INFO_SNVSB: double (nullable = true)
 |-- INFO_DP: integer (nullable = true)
 |-- INFO_PNOISE: double (nullable = true)
 |-- INFO_QSS: integer (nullable = true)
 |-- INFO_MQ: double (nullable = true)
 |-- INFO

#### Load and filter

In [12]:
%%time
tid, vcf = load_strelka_somatic_vcf(vcf_path)



CPU times: user 39.8 ms, sys: 8.93 ms, total: 48.8 ms
Wall time: 419 ms


In [17]:
vcf.head(2)



[Row(contigName='1', start=97952, end=97953, names=None, referenceAllele='A', alternateAlleles=['G'], qual=None, filters=['PASS'], splitFromMultiAllelic=False, INFO_SomaticEVS=9.59, INFO_QSS_NT=58, INFO_PNOISE2=None, INFO_MQ0=28, INFO_ReadPosRankSum=-1.26, INFO_TQSS=2, INFO_SNVSB=0.0, INFO_DP=99, INFO_PNOISE=None, INFO_QSS=58, INFO_MQ=19.92, INFO_SGT='AA->AG', INFO_SOMATIC=True, INFO_NT='ref', INFO_TQSS_NT=2, FDP=0, GU=[9, 17], AU=[28, 39], depth=38, SUBDP=0, TU=[0, 0], SDP=0, CU=[1, 1]),
 Row(contigName='1', start=924291, end=924292, names=None, referenceAllele='A', alternateAlleles=['G'], qual=None, filters=['PASS'], splitFromMultiAllelic=False, INFO_SomaticEVS=17.71, INFO_QSS_NT=65, INFO_PNOISE2=None, INFO_MQ0=0, INFO_ReadPosRankSum=0.78, INFO_TQSS=1, INFO_SNVSB=0.0, INFO_DP=82, INFO_PNOISE=None, INFO_QSS=65, INFO_MQ=59.86, INFO_SGT='AA->AG', INFO_SOMATIC=True, INFO_NT='ref', INFO_TQSS_NT=1, FDP=2, GU=[10, 12], AU=[44, 45], depth=57, SUBDP=0, TU=[0, 0], SDP=0, CU=[1, 1])]

In [166]:
%%time
vcf.count()



CPU times: user 4.84 ms, sys: 0 ns, total: 4.84 ms
Wall time: 748 ms




5911

### Load 5 VCFs

In [110]:
%%time

vcf_df = None
for p in vcf_paths:
    tid, vcf = load_strelka_somatic_vcf(p)
    new_vcf = vcf.withColumn('sampleId', lit(tid)) #spark.createDataFrame([(tid, vcf)], schema)
    vcf_df = new_vcf if vcf_df is None else vcf_df.unionAll(new_vcf)
    



CPU times: user 154 ms, sys: 22.7 ms, total: 177 ms
Wall time: 999 ms




#### Count variants per sample

In [118]:
%%time

def get_variants_per_sample(df):
    df.groupBy('sampleId').count().collect()
    
get_variants_per_sample(vcf_df)




CPU times: user 41.2 ms, sys: 0 ns, total: 41.2 ms
Wall time: 9.13 s


                                                                                

#### Get min POS for each sample and each chromosome

In [119]:
%%time

def count_min_pos_per_chromosome(df):
    df.groupBy('sampleId', 'contigName').agg(fx.min(df.start)).collect()
    
count_min_pos_per_chromosome(vcf_df)



CPU times: user 28.3 ms, sys: 11.2 ms, total: 39.6 ms
Wall time: 8.87 s


                                                                                

## Make data lake

In [None]:
def save_datalake(vcf_obj, path):
    (
      vcf_obj
      .write
      .format("delta")
      .mode("overwrite")
      .save(path)
    )

def load_datalake(path):
    delta_vcf = spark.read.format("delta").load(path)    
    return delta_vcf

### Single sample

In [124]:
%%time
single_lake = "delta/test-delta"
save_datalake(vcf, single_lake)



CPU times: user 10.6 ms, sys: 4.02 ms, total: 14.7 ms
Wall time: 10.2 s




### Multisample

In [125]:
%%time
multi_lake = "delta/test-delta-multi"

save_datalake(vcf_df, multi_lake)

21/08/10 00:52:17 WARN DAGScheduler: Broadcasting large task binary with size 1169.8 KiB

CPU times: user 18.1 ms, sys: 25.1 ms, total: 43.2 ms
Wall time: 32.7 s


                                                                                

## Load datalake

In [126]:
%%time
delta_vcfs = load_datalake(multi_lake)
delta_vcfs.show(n=2)

+----------+------+------+-----+---------------+----------------+----+-------+---------------------+---------------+-----------+------------+--------+-------------------+---------+----------+-------+-----------+--------+-------+--------+------------+-------+------------+---+------+--------+-----+-----+--------+---+------+-------------+
|contigName| start|   end|names|referenceAllele|alternateAlleles|qual|filters|splitFromMultiAllelic|INFO_SomaticEVS|INFO_QSS_NT|INFO_PNOISE2|INFO_MQ0|INFO_ReadPosRankSum|INFO_TQSS|INFO_SNVSB|INFO_DP|INFO_PNOISE|INFO_QSS|INFO_MQ|INFO_SGT|INFO_SOMATIC|INFO_NT|INFO_TQSS_NT|FDP|    GU|      AU|depth|SUBDP|      TU|SDP|    CU|     sampleId|
+----------+------+------+-----+---------------+----------------+----+-------+---------------------+---------------+-----------+------------+--------+-------------------+---------+----------+-------+-----------+--------+-------+--------+------------+-------+------------+---+------+--------+-----+-----+--------+---+------+-

In [127]:
%%time
delta_vcfs.count()

CPU times: user 2.03 ms, sys: 112 µs, total: 2.15 ms
Wall time: 423 ms


71174

## Timings

### Filtering

In [132]:
def get_to_G_subs(df):
    return df.filter(array_contains(col("alternateAlleles"), 'G'))

In [135]:
%%time
get_to_G_subs(vcf_df).count()

                                                                                

In [136]:
%%time
get_to_G_subs(delta_vcfs).count()

CPU times: user 3.49 ms, sys: 552 µs, total: 4.05 ms
Wall time: 448 ms


16360

### Filter and select columns

In [144]:
%%time
get_to_G_subs(vcf_df).select(vcf_df.start).collect()[0:10]



CPU times: user 77.8 ms, sys: 799 µs, total: 78.6 ms
Wall time: 6.74 s


                                                                                

[Row(start=97952),
 Row(start=924291),
 Row(start=1522384),
 Row(start=2625453),
 Row(start=4281550),
 Row(start=5792658),
 Row(start=5873748),
 Row(start=6539592),
 Row(start=7560292),
 Row(start=11157675)]

In [143]:
%%time
get_to_G_subs(delta_vcfs).select(delta_vcfs.start).collect()[0:10]

CPU times: user 140 ms, sys: 3.85 ms, total: 144 ms
Wall time: 559 ms


[Row(start=526071),
 Row(start=924615),
 Row(start=1125050),
 Row(start=1194479),
 Row(start=1205188),
 Row(start=1530945),
 Row(start=1695385),
 Row(start=2038188),
 Row(start=2354417),
 Row(start=2630709)]

### Select column on full

In [148]:
%%time
vcf_df.select(col("filters"), col("qual")).collect()[0:10]

                                                                                

CPU times: user 318 ms, sys: 10.9 ms, total: 329 ms
Wall time: 6.82 s


[Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None)]

In [147]:
%%time
delta_vcf.select(col("filters"), col("qual")).collect()[0:10]

CPU times: user 222 ms, sys: 1.59 ms, total: 223 ms
Wall time: 667 ms


[Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None),
 Row(filters=['PASS'], qual=None)]

### Count records (on full)

In [149]:
%%time
vcf_df.count()



CPU times: user 18.4 ms, sys: 7.06 ms, total: 25.5 ms
Wall time: 6.84 s


                                                                                

71174

In [150]:
%%time
delta_vcfs.count()

CPU times: user 3.35 ms, sys: 0 ns, total: 3.35 ms
Wall time: 359 ms


71174

In [None]:
#
# code examples from the manual
#

#phenotypes_path = '/databricks-datasets/genomics/1000G/phenotypes.normalized'
#reference_genome_path = "/dbfs/databricks-datasets/genomics/grch37/data/human_g1k_v37.fa"
#vcf_output_path = "dbfs:/home/genomics/vcf/subset.vcf"


#genotype = delta_vcf.where((fx.col("contigName") == '22') & 
#                           (fx.col("start") == 1234567)). \
#                     selectExpr("contigName", "start", "filter(genotypes, g -> g.sampleId = '{0}') as genotypes".format(sample_id))

## Class impl that do not work 
#### (problematic inheritance from DataFrame)

In [None]:
vcf_path_glob = 'data/*20R.strelka.somatic.snvs.vcf.gz'
vcf_paths = glob.glob(vcf_path_glob)
vcf_path = vcf_paths[0]

class VCF(DataFrame):
    
    path = None
    vcf = None
    
    def __init__(self, path):
        self = self.__load_vcf(path)
        self.path = path
    
    def __load_vcf(self, path, includeSampleIds=True):
        vcf = (
          spark
          .read
          .format("vcf")
          .load(path, includeSampleIds=includeSampleIds)
        )
        return vcf
   
    
    def filter_PASS(self):
        self.vcf = self.vcf.filter(array_contains(col("filters"), 'PASS'))
        return self
        
        
class StrelkaSomaticSnvVCF(VCF):

    extension = '.strelka.somatic.snvs.vcf.gz'
    
    def __init__(self, path):
        super().__init__(path)
    
    def get_only_tumor_df(self, cols2keep=None):
        if cols2keep is None:
            cols2keep = [e for e in self.vcf.columns if e not in ['genotypes']]
    
        #
        # this is a proper way to explot the genotypes array but due to a bug? / Delta API changes? it does not work
        #vcf.select(element_at(col("genotypes"),1).alias('tumor')).select(glow.expand_struct("tumor"))
        tvcf = self.vcf.select(cols2keep + [self.vcf.genotypes.getItem(1).alias('tumor')])
        self.vcf = tvcf.select(cols2keep + ["tumor.*"]).drop('sampleId')
        return self
    
    def get_tumor_and_normal_ids_from_path(self):
        tvsn = os.path.basename(self.path)[:-len(self.extension)]
        try:
            tsid,nsid = tvsn.split('_vs_')
            return (tsid,nsid)
        except ValueError:
            print("For tumor and normal ID inference the filename of Strelka somatic VCF "+
                  "is expected to be [TUMOR_vs_NORMAL" + self.extension + "].\n"
                  "It was " + os.path.basename(self.path))
    
    def get_tumor_PASS_variants():
        return self.filter_pass().get_only_tumor_df()
        
