<a href="https://colab.research.google.com/github/sigmarkarl/notebooks/blob/main/SparkGOR_Jupyter_Introduction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction to SparkGOR

This notebook gives an introduction to the integration of Spark and GOR using the Google Colab environment. To use this notebook in a meaningful way, you will need to have access to some data in the GOR format, such as the reference data available as part of the GOR Open Source project. 

This notebook can be run in one of two ways:

* Connect to a hosted Jupyter runtime with Google Drive
* Connect to a local Jupyter runtime with local file storage

For the purposes of this demonstration, we will be using a local Jupyter runtime, which is setup as described on the following page: https://research.google.com/colaboratory/local-runtimes.html



## Initial setup

In order to run the examples that are outlined in this notebook, you must first install pyspark and numpy locally using your package manager. For example, if you use pip to install Python packages, you can use the following commands to install the necessary packages:

In [None]:
!pip3 install pyspark
!pip3 install gor_pyspark

It is advisable to do this before running your local Jupyter server so that the modules will be available to you in the Jupyter environment.

You should also ensure that the Jupyter server is using Python 3 because of dependencies needed by pyspark. An example is shown here:

In [None]:
export PYSPARK_PYTHON=/usr/local/bin/python3

## Hosted Jupyter runtime

It is also possible to connect to a hosted Jupyter server by selecting **Connect to hosted runtime** in the dropdown menu at the top of this page. Keep in mind that the memory settings for this method are limited to what is made available by the Google Colab environment. These settings should be sufficient to run any of the examples here.

The files are shared at the following location: https://drive.google.com/drive/folders/1lM6YKPZIexb_1D8pk6KnVd5AXn0Jp0ek if you wish to use this method.

You will need to set up the files within your Google Drive. Note that the necessary files for the examples are over the free limit for Drive and you will need to have a higher storage limit.

Use the following code block to mount the Google Drive and make the folder accessible to the hosted runtime.


In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

## Setting up the reference data and PheCode

To install and run a local Jupyter server, please follow the instructions on this page: https://research.google.com/colaboratory/local-runtimes.html

You must also download the reference data and PheCode files as outlined on the following sections of the GOR open source project: 

* [Set up the reference data](https://github.com/gorpipe/gor#setting-up-reference-data-optional)
* [Set up the PheCode GWAS data](https://github.com/gorpipe/gor#setting-up-phecode-gwas-data-optional)

When you start your local Jupyter server, make sure to start it from within the folder where you have put your reference data files. Otherwise, you may need to edit the paths in the examples below.

In [1]:
# Setting config variables for the PySpark session

#spark.stop()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]") \
          .appName("SparkGOR demo") \
          .config("spark.jars.packages","org.gorpipe:gor-spark:3.19.0") \
          .config("spark.jars.excludes","org.eclipse.jetty:jetty-util-ajax,io.dropwizard.metrics:metrics-jvm,io.dropwizard.metrics:metrics-json,org.jetbrains.kotlin:kotlin-stdlib-common") \
          .getOrCreate()

# For lower spec computers, the following additional config variable may be necessary (append to the config variables above):
#config("spark.driver.memory","8g").config("spark.executor.memory","8g")

In [10]:
# Initialize SparkGOR session
# If you are using the GOR session with options, you can specify other folders for the config and project directories.

import os
import gor_pyspark
#sgs = spark.createGorSession()
sgs = spark.createGorSessionWOptions(os.path.expanduser("~/gorproject"),"result_cache","config/gor_config.txt","config/gor_standard_aliases.txt")

In [3]:
# SparkGOR query requiring no data
sgs.pydataframe("gorrows -p chr1:1-4 | calc x pos+1").toPandas()

Unnamed: 0,chrom,pos,x
0,chr1,1,2
1,chr1,2,3
2,chr1,3,4


# Examples from the SparkGOR paper

The following section runs examples from the [SparkGOR paper](https://arxiv.org/abs/2009.00061), which can be accessed at: https://arxiv.org/abs/2009.00061

### From Examples 2,3,4
This example shows how to use nested GOR query as a Spark DataSource in SQL. 
Note how the partitioning is controlled by the parallel PGOR query (based on genomic locus).
Uses gor create statement with spark sql to cache the results. 
The GOR specific varjoin function is optimized for genomic variant joins.

In [12]:
ordbsnp = sgs.pydataframe("select * from <(pgor ref/dbsnp/dbsnp.gorz | top 100000 | split rsIDs | rename rsIDs rsID) order by rsID")
savepath = os.path.expanduser("~/gorproject/dbsnp.rsOrd.parquet")
ordbsnp.write.mode("overwrite").save(savepath)
sgs.setCreate("#myordrssnps#","select * from dbsnp.rsOrd.parquet where rsID like 'rs222%' order by chrom, pos")

ss = sgs.pydataframe("create #myphewas# = pgor [#myordrssnps#] | varjoin -l -r phecode_gwas/Phecode_adjust_f2.gord; nor [#myphewas#] | sort -c pval_mm:n,rsID")
ss.limit(10).toPandas()

Unnamed: 0,chrom,pos,reference,allele,rsID,pVal_mm,OR_mm,CASE_info,GC,QQ,BONF,HOLM,Source
0,chr16,29811,T,C,rs2228650,2.6e-28,0.225538,3/39/133,0.17751,0.31779,4.447e-21,3.0338999999999998e-21,282.5
1,chr12,55657,C,T,rs2221166,8.4e-22,0.37643,13/77/159,0.24099,0.3565,1.4367e-14,9.2455e-15,282.5
2,chr17,155173,A,G,rs2223138,2.7e-15,0.504448,231/149/26,0.3341,0.40866,4.6181e-08,2.7311e-08,282.5
3,chr12,55657,C,T,rs2221166,4.7e-12,0.340889,8/26/76,0.055197,0.21557,8.0389e-05,6.3061e-05,133.0
4,chr20,297874,T,C,rs2223665,1.8e-11,1.73077,160/139/33,0.41144,0.4508,0.00030787,0.00016913,282.5
5,chr12,55657,C,T,rs2221166,1.9e-10,0.46172,15/50/100,0.047745,0.14893,0.0032498,0.002766,282.8
6,chr12,55657,C,T,rs2221166,1.9e-06,0.937565,1964/5591/4538,0.1387,0.26042,1.0,1.0,250.2
7,chr12,55657,C,T,rs2221166,2.7e-06,0.824517,166/588/512,0.005935,0.057643,1.0,1.0,646.0
8,chr16,29811,T,C,rs2228650,6.7e-06,0.919421,936/2972/2544,0.16164,0.27737,1.0,1.0,250.2
9,chr17,155173,A,G,rs2223138,8.6e-06,1.489344,500/117/4,3.3e-05,0.000241,1.0,1.0,561.1


### Example 7
This example shows how pandas dataframe is used in the GOR context by registering it as a table in Spark. 
The table is then stored in the GOR cache using a create statement

In [14]:

import pandas as pd
myPandasGenes = pd.DataFrame(["BRCA1","BRCA2"],columns=["gene"])
myGenes = spark.createDataFrame(myPandasGenes)
myGenes.createOrReplaceTempView("myGenes")
sgs.setCreateAndDefs("""
    create #mygenes# = select gene from myGenes; 
    def #genes# = ref/genes.gorz; 
    def #exons# = ref/refgenes/refgenes_exons.gorz; 
    def #dbsnp# = ref/dbsnp/dbsnp.gorz;
""")
sgs.setCreate("#myexons#", "gor #exons# | inset -c gene_symbol [#mygenes#]")
exonSnps = sgs.pydataframe("pgor [#myexons#] | join -segsnp -ir #dbsnp# | join -snpseg -r #genes#")
snpCount = exonSnps.groupBy("gene_symbol").count()
snpCount.toPandas()

Unnamed: 0,gene_symbol,count
0,RPL21P4,15
1,BRCA1,2101
2,BRCA2,3037


### Example 8
Same as 7 using SQL with nested GOR syntax

In [16]:
snpCount2 = sgs.pydataframe("select count(*) from <(pgor [#myexons#] | join -segsnp -ir #dbsnp# | join -snpseg -r ref/genes.gorz) group by gene_symbol")
snpCount2.toPandas()

Unnamed: 0,count(1)
0,15
1,2101
2,3037


The GOR registerd Spark DataSource is used here to load file in the .gorz format into a spark DataFrame

In [19]:
# Create parquet file from dbsnp.gorz
dbsnpGorz = spark.read.format("gor").load(os.path.expanduser("~/gorproject/ref/dbsnp/dbsnp.gorz")).limit(1001)
#dbsnpGorz = sgs.pydataframe("select * from ref/dbsnp/dbsnp.gorz").limit(1000)
dbsnpGorz.write.mode("overwrite").save("dbsnp.parquet")
dbsnpGorz.limit(5).toPandas()

Unnamed: 0,chrom,pos,reference,allele,rsids
0,chr1,10020,AA,A,rs775809821
1,chr1,10039,A,C,rs978760828
2,chr1,10043,T,A,rs1008829651
3,chr1,10051,A,G,rs1052373574
4,chr1,10054,C,CC,rs1326880612


In [20]:
# Example how to remove create
sgs.removeCreate("#myexons#")

'gor #exons# | inset -c gene_symbol [#mygenes#]'

### Example 9
The GOR Python SDK registers the .gor function on a Spark DataFrame. GOR functions are run as a .mapPartition function

In [24]:
dbsnpDf = spark.read.load("dbsnp.parquet")

myVars = dbsnpDf.gor("calc type = if(len(reference)=len(allele),'Snp','InDel')")
myVars.createOrReplaceTempView("myVars")
sgs.setDef("#VEPP#","phecode_gwas/metadata/vep_single.gorz")
myVarsAnno = sgs.pydataframe("select * from myVars order by chrom,pos")
pyVarsAnno = myVarsAnno.gor("varnorm -left reference allele | group 1 -gc reference,allele,type -set -sc rsIDs | rename set_rsIDs rsIDs | varjoin -r -l -e 'NA' <(gor #VEPP# | select 1-call,max_consequence)")
pyVarsAnno.limit(5).toPandas()

Unnamed: 0,chrom,pos,reference,allele,type,rsIDs,max_consequence
0,chr1,10020,,N,InDel,rs775809821,
1,chr1,10039,A,C,Snp,rs978760828,
2,chr1,10043,T,A,Snp,rs1008829651,
3,chr1,10051,A,G,Snp,rs1052373574,
4,chr1,10054,N,NC,InDel,rs1326880612,
