## Parallel Call Variants with Spark Deep Variant

### Start Spark with dependencies

In [3]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("Spark Deep Variant") \
        .config("spark.driver.memory", "12G") \
        .config("spark.jars", "./spark-deepvariant-assembly-1.0.2.jar") \
        .master("local[8]") \
        .getOrCreate()
spark

## Define the Pipeline

In [4]:
from sparkdv.transformers import VariantCallerModel, MakeExamples
from pyspark.ml import PipelineModel

In [5]:
make_examples = MakeExamples()\
  .setMode("calling")\
  .setRefPath("./reference/GRCh38_no_alt_analysis_set.fasta")\
  .setReadsPathCol("filename")\
  .setRegions("chr20")

variant_caller = VariantCallerModel()\
  .load("./variant_caller_fast")\
  .setUseGPU(False)

pipeline = PipelineModel([make_examples, variant_caller])

In [6]:
pipeline.stages

[MakeExamples_5fb8a63ac7e7, VariantCallerModel_7fcdc78167d6]

## Load data

In [8]:
!ls ../../franco/input

HG003.novaseq.pcr-free.35x.dedup.grch38_no_alt.chr20.bam
HG003.novaseq.pcr-free.35x.dedup.grch38_no_alt.chr20.bam.bai


In [7]:
from sparkdv.datasets.dataset_reader import BamReader

reads_path = "../../franco/input"
reads_df = BamReader(). \
  setTasksPerBamReadsFile(96). \
  setTasksGroupSize(1). \
  loadReads(spark, reads_path)

In [9]:
help(BamReader().setTasksGroupSize)

Help on method setTasksGroupSize in module sparkdv.datasets.dataset_reader:

setTasksGroupSize(k) method of sparkdv.datasets.dataset_reader.BamReader instance
    Sets the group size for task groups that will be run for files in this dataset.
    Each input file is partitioned in multiple tasks and these tasks are grouped, in a number
    of groups, each of size k
    Parameters:
    - k (int): This is the group size for each of the task groups used to process a file.
    
    Returns:
    BamReader: returns itself.



In [10]:
reads_df.rdd.getNumPartitions()

2

In [11]:
reads_df.printSchema()

root
 |-- streams: array (nullable = true)
 |    |-- element: binary (containsNull = true)
 |-- filename: string (nullable = true)
 |-- taskIds: array (nullable = true)
 |    |-- element: integer (containsNull = false)
 |-- version: string (nullable = true)
 |-- groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- attributes: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- key: string (nullable = true)
 |    |    |    |    |-- value: string (nullable = true)



In [12]:
reads_df.select("groups", "taskIds").limit(100).show(truncate=False, n=100)

+-----------------------------------------------------------------------------------+-------+
|groups                                                                             |taskIds|
+-----------------------------------------------------------------------------------+-------+
|[{HG003.pcr-free, [{PL, ILLUMINA}, {PU, NONE}, {LB, HG003.pcr-free}, {SM, HG003}]}]|[0]    |
|[{HG003.pcr-free, [{PL, ILLUMINA}, {PU, NONE}, {LB, HG003.pcr-free}, {SM, HG003}]}]|[1]    |
|[{HG003.pcr-free, [{PL, ILLUMINA}, {PU, NONE}, {LB, HG003.pcr-free}, {SM, HG003}]}]|[2]    |
|[{HG003.pcr-free, [{PL, ILLUMINA}, {PU, NONE}, {LB, HG003.pcr-free}, {SM, HG003}]}]|[3]    |
|[{HG003.pcr-free, [{PL, ILLUMINA}, {PU, NONE}, {LB, HG003.pcr-free}, {SM, HG003}]}]|[4]    |
|[{HG003.pcr-free, [{PL, ILLUMINA}, {PU, NONE}, {LB, HG003.pcr-free}, {SM, HG003}]}]|[5]    |
|[{HG003.pcr-free, [{PL, ILLUMINA}, {PU, NONE}, {LB, HG003.pcr-free}, {SM, HG003}]}]|[6]    |
|[{HG003.pcr-free, [{PL, ILLUMINA}, {PU, NONE}, {LB, HG003.p

### Plot number of partitions
Num of partitions determine how many tasks we can actually run in parallel

In [11]:
reads_df.rdd.getNumPartitions()

2

## Make Examples

In [13]:
reads_df.limit(2).select("filename", "taskIds").show()

+--------------------+-------+
|            filename|taskIds|
+--------------------+-------+
|file:/home/franco...|    [0]|
|file:/home/franco...|    [1]|
+--------------------+-------+



In [14]:
examples = make_examples.transform(reads_df.limit(2)).cache()

In [15]:
(examples.rdd.getNumPartitions(), examples.count())

(1, 2542)

In [23]:
examples.printSchema()

root
 |-- streams: array (nullable = true)
 |    |-- element: binary (containsNull = true)
 |-- filename: string (nullable = true)
 |-- taskIds: array (nullable = true)
 |    |-- element: integer (containsNull = false)
 |-- version: string (nullable = true)
 |-- groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- attributes: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- key: string (nullable = true)
 |    |    |    |    |-- value: string (nullable = true)
 |-- localBamPath: string (nullable = true)
 |-- locus: string (nullable = true)
 |-- pileup: binary (nullable = true)



In [16]:
examples.select("locus", "pileup").show(n=40)

+--------------------+--------------------+
|               locus|              pileup|
+--------------------+--------------------+
| chr20:192100-192100|[1F 8B 08 00 00 0...|
| chr20:288158-288158|[1F 8B 08 00 00 0...|
| chr20:288204-288204|[1F 8B 08 00 00 0...|
| chr20:288232-288232|[1F 8B 08 00 00 0...|
| chr20:288365-288365|[1F 8B 08 00 00 0...|
| chr20:480423-480424|[1F 8B 08 00 00 0...|
| chr20:672388-672390|[1F 8B 08 00 00 0...|
| chr20:672590-672590|[1F 8B 08 00 00 0...|
| chr20:768267-768267|[1F 8B 08 00 00 0...|
| chr20:864029-864029|[1F 8B 08 00 00 0...|
| chr20:864080-864080|[1F 8B 08 00 00 0...|
| chr20:960776-960776|[1F 8B 08 00 00 0...|
|chr20:1248012-124...|[1F 8B 08 00 00 0...|
|chr20:1248073-124...|[1F 8B 08 00 00 0...|
|chr20:1248088-124...|[1F 8B 08 00 00 0...|
|chr20:1248109-124...|[1F 8B 08 00 00 0...|
|chr20:1248128-124...|[1F 8B 08 00 00 0...|
|chr20:1440771-144...|[1F 8B 08 00 00 0...|
|chr20:1536402-153...|[1F 8B 08 00 00 0...|
|chr20:1536402-153...|[1F 8B 08 

## Variant Caller Model

In [17]:
variant_caller = VariantCallerModel()\
  .load("./variant_caller_fast")\
  .setUseGPU(False)

In [18]:
probabilities = variant_caller.transform(examples)
probabilities.printSchema()

root
 |-- streams: array (nullable = true)
 |    |-- element: binary (containsNull = true)
 |-- filename: string (nullable = true)
 |-- taskIds: array (nullable = true)
 |    |-- element: integer (containsNull = false)
 |-- version: string (nullable = true)
 |-- groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- attributes: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- key: string (nullable = true)
 |    |    |    |    |-- value: string (nullable = true)
 |-- localBamPath: string (nullable = true)
 |-- locus: string (nullable = true)
 |-- probabilities: array (nullable = true)
 |    |-- element: float (containsNull = true)



In [19]:
probabilities.limit(20).select("locus", "probabilities").show(truncate=False)

+---------------------+----------------------------------------+
|locus                |probabilities                           |
+---------------------+----------------------------------------+
|chr20:192100-192100  |[1.3759445E-7, 9.2800654E-8, 0.99999976]|
|chr20:288158-288158  |[0.9999949, 5.0948224E-6, 3.114292E-8]  |
|chr20:288204-288204  |[0.99999833, 1.7198603E-6, 2.4491207E-8]|
|chr20:288232-288232  |[0.99999833, 1.6657666E-6, 2.181632E-8] |
|chr20:288365-288365  |[0.9999989, 1.0229137E-6, 1.0515295E-8] |
|chr20:480423-480424  |[8.096211E-5, 0.99991846, 5.590518E-7]  |
|chr20:672388-672390  |[3.9953765E-4, 0.9995989, 1.5630934E-6] |
|chr20:672590-672590  |[2.0235846E-6, 1.2812563E-6, 0.99999666]|
|chr20:768267-768267  |[0.06088002, 0.93894625, 1.7369262E-4]  |
|chr20:864029-864029  |[2.0736522E-6, 2.8318205E-8, 0.99999785]|
|chr20:864080-864080  |[5.8574433E-6, 0.9999932, 1.0114603E-6] |
|chr20:960776-960776  |[1.5106653E-5, 0.9999846, 1.9621521E-7] |
|chr20:1248012-1248012|[1

## CPU vs. GPU

#### CPU

In [20]:
%%time
probabilities.select("locus", "probabilities").write.parquet("cpu_results.parquet")

CPU times: user 27.4 ms, sys: 3.85 ms, total: 31.2 ms
Wall time: 2min 19s


#### GPU

In [16]:
variant_caller = VariantCallerModel()\
  .load("./variant_caller_fast")\
  .setUseGPU(True)

examples = examples.repartition(1)
probabilities = variant_caller.transform(examples)

In [17]:
%%time
probabilities.select("locus", "probabilities").write.parquet("gpu_results.parquet")

CPU times: user 7.2 ms, sys: 427 µs, total: 7.63 ms
Wall time: 22.5 s


## Run end-to-end pipeline - store to disk
In this section we will show how to run the entire pipeline end-to-end, and how to store results back a file in a DFS.

In [25]:
make_examples = MakeExamples()\
  .setMode("calling")\
  .setRefPath("./reference/GRCh38_no_alt_analysis_set.fasta")\
  .setReadsPathCol("filename")\
  .setOutputExamplePath("./examples")\
  .setRegions("chr20")


variant_caller = VariantCallerModel()\
  .load("./variant_caller_fast")\
  .setUseGPU(True)

pipeline = PipelineModel([make_examples, variant_caller])

In [None]:
%%time
pipeline.transform(reads_df).select("filename", "locus", "probabilities")\
  .write.parquet("./probabilities.parquet")

In [None]:
results = spark.read.parquet("./probabilities.parquet")