## Initialize spark context
This code will create a spark cluster on k8s with 3 executor. It also configured S3 hadoop client (using S3A). Both schemes s3a and s3 are supported. After pagraph below ran, you can check taht executors have been created by running `kubectl get po`. Expected output :
```
NAME                              READY   STATUS      RESTARTS   AGE
minio                             1/1     Running     0          9m1s
spark-2f2b6f90ef906e54-exec-1     1/1     Running     0          50s
spark-2f2b6f90ef906e54-exec-2     1/1     Running     0          48s
spark-2f2b6f90ef906e54-exec-3     1/1     Running     0          48s
spark-master-87999d7db-zg72s      1/1     Running     0          13m
```

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions

spark = SparkSession.builder.master("k8s://https://kubernetes.default.svc.cluster.local:443") \
                            .appName("spark") \
                            .config('spark.driver.extraJavaOptions', '-Divy.cache.dir=/tmp -Divy.home=/tmp') \
                            .config('spark.kubernetes.file.upload.path', 's3://hail/spark') \
                            .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
                            .config('spark.hadoop.fs.AbstractFileSystem.s3.impl', 'org.apache.hadoop.fs.s3a.S3A') \
                            .config('spark.hadoop.fs.s3.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
                            .config('spark.hadoop.fs.s3a.fast.upload', 'true') \
                            .config('spark.hadoop.fs.s3a.connection.ssl.enabled', 'true') \
                            .config('spark.hadoop.fs.s3a.path.style.access', 'true') \
                            .config('spark.hadoop.fs.s3a.endpoint', 'http://minio:9000') \
                            .config('spark.hadoop.fs.s3a.access.key', 'minioadmin') \
                            .config('spark.hadoop.fs.s3a.secret.key', 'minioadmin') \
                            .config("spark.executor.instances", 3) \
                            .config("spark.submit.deployMode", "client") \
                            .config("spark.driver.host", "spark-master") \
                            .config("spark.driver.port", "8002") \
                            .config("spark.blockManager.port", "8001") \
                            .config("spark.kubernetes.namespace", "default") \
                            .config("spark.kubernetes.container.image", "ferlabcrsj/hail") \
                            .config("spark.kubernetes.container.image.pullPolicy", "Never") \
                            .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
                            .config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark") \
                            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
                            .config("spark.kryo.registrator", "is.hail.kryo.HailKryoRegistrator") \
                            .getOrCreate()

## Initialize hail with the spark context previously created

In [None]:
import hail as hl
hl.init(sc=spark.sparkContext)  
hl.balding_nichols_model(3, 1000, 1000).show()


## Read a file using s3a scheme

In [None]:
spark.read.text('s3a://hail/gvcf/*.gvcf.gz').limit(10).toPandas()

## Read a file using s3 scheme

In [None]:
spark.read.text('s3://hail/gvcf/*.gvcf.gz').limit(10).toPandas()

## Combine gVCF with Hail
gVCF files are in objectstore.The VDS will also be stored in object store.

In [None]:
combiner = hl.vds.new_combiner(
    output_path='s3://hail/vds/dataset.vds',
    temp_path='s3://hail/tmp',
    gvcf_paths=['s3://hail/gvcf/S16907_downsamples.gvcf.gz', 's3://hail/gvcf/S19635_downsamples.gvcf.gz', 's3://hail/gvcf/S19636_downsamples.gvcf.gz'],
    use_genome_default_intervals=True,
    reference_genome=hl.get_reference('GRCh38')
)

In [None]:
combiner.run()

In [None]:
vds = hl.vds.read_vds('s3://hail/vds/dataset.vds')

In [None]:
smt = hl.vds.to_merged_sparse_mt(vds, ref_allele_function=lambda locus: hl.missing('str'))

In [None]:
smt.show()

## Stop Spark context

In [None]:
spark.stop()