# VariantSpark example using Hail library
### Initialisation: loading libraries

In [None]:
%%juspark
{"spark.driver.memory":"24G","spark.jars":"/home/hadoop/miniconda2/envs/jupyter/lib/python2.7/site-packages/varspark/jars/varspark.jar"}


In [None]:
import hail
import varspark.hail
hc = hail.HailContext(sc)
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
from math import log, isnan
from pprint import pprint
from decimal import Decimal
from hail import KeyTable
from hail.keytable import asc
from hail.keytable import desc

In [None]:
numCPU=32
numPartition=numCPU*4

# Set Demo to True to quickly run the notebook on a very small dataset with 20K SNPs
Demo=False 

### Code for Creating Manhattan Plot

In [None]:
def ColorImportant(pd):
    pd['c'] = "silver"
    pd.loc[pd['chr'] == '1', 'c']  = "lightgrey"
    pd.loc[pd['chr'] == '3', 'c']  = "lightgrey"
    pd.loc[pd['chr'] == '5', 'c']  = "lightgrey"
    pd.loc[pd['chr'] == '7', 'c']  = "lightgrey"
    pd.loc[pd['chr'] == '9', 'c']  = "lightgrey"
    pd.loc[pd['chr'] == '11', 'c'] = "lightgrey"
    pd.loc[pd['chr'] == '13', 'c'] = "lightgrey"
    pd.loc[pd['chr'] == '15', 'c'] = "lightgrey"
    pd.loc[pd['chr'] == '17', 'c'] = "lightgrey"
    pd.loc[pd['chr'] == '19', 'c'] = "lightgrey"
    pd.loc[pd['chr'] == '21', 'c'] = "lightgrey"
    
    pd.loc[pd['jk'] == '2_223034082', 'c'] = "black"  #B6
    pd.loc[pd['jk'] == '4_54511913' , 'c'] = "red"   #B2
    pd.loc[pd['jk'] == '5_126626044', 'c'] = "green" #R1
    pd.loc[pd['jk'] == '7_17284577'  , 'c'] = "blue" #C2
    
    pd['Important'] = False
    
    pd.loc[pd['jk'] == '2_223034082', 'Important'] = True #B6
    pd.loc[pd['jk'] == '4_54511913' , 'Important'] = True #B2
    pd.loc[pd['jk'] == '5_126626044', 'Important'] = True #R1
    pd.loc[pd['jk'] == '7_17284577' , 'Important'] = True #C2
    return pd

ofs = [
    0,
    0,         # offset for chr1
    249250621, # offset for chr2
    492449994, # offset for chr3
    690472424,
    881626700,
    1062541960,
    1233657027,
    1392795690,
    1539159712,
    1680373143,
    1815907890,
    1950914406,
    2084766301,
    2199936179,
    2307285719,
    2409817111,
    2500171864,
    2581367074,
    2659444322,
    2722469842,
    2781598825,
    2832903391
]


![VariantSpark](https://s3.us-east-2.amazonaws.com/csiro-graphics/variant-spark.png)
* [**VariantSpark**](http://bioinformatics.csiro.au/variantspark) is a machine learning library for real-time genomic data analysis (for thousands of samples and millions of variants) and is...  
  * Built on top of Apache Spark and written in Scala
  * Authored by the team at [CSIRO Bioinformatics](http://bioinformatics.csiro.au/) in Australia
  * Uses a custom machine learning **random forest** implementation to find the most *important* variants attributing to a phenotype of interest   
* This demo...  
  * Includes a dataset with a subset of the samples and variants (in VCF format) from the 1000 Genomes Project  
  * Uses a synthetic phenotype called *HipsterIndex* (in CSV format) factoring various real phenotypes (monobrow, beard, etc.)

## Hipster Index

The synthetic HipsterIndex was created based on the 1000 Genome Project data using the following four genotypes:

| ID |SNP ID     | chromosome | position | phenotype | reference |
|---:|----------:|----:|-------:|-----:|----------:|
| B6 |[rs2218065](https://www.ncbi.nlm.nih.gov/projects/SNP/snp_ref.cgi?rs=2218065) | chr2 | 223034082 | monobrow | [Adhikari K, et al. (2016) Nat Commun.](https://www.ncbi.nlm.nih.gov/pubmed/?term=26926045) |
| R1 |[rs1363387](https://www.ncbi.nlm.nih.gov/projects/SNP/snp_ref.cgi?rs=1363387) | chr5 | 126626044 | Retina horizontal cells (checks) | [Kay, JN et al. (2012) Nature](https://www.ncbi.nlm.nih.gov/pubmed/?term=22407321)
| B2 |[rs4864809](https://www.ncbi.nlm.nih.gov/projects/SNP/snp_ref.cgi?rs=4864809) | chr4 |  54511913 | beard | [Adhikari K, et al. (2016) Nat Commun.](https://www.ncbi.nlm.nih.gov/pubmed/?term=26926045)
| C2 |[rs4410790](https://www.ncbi.nlm.nih.gov/projects/SNP/snp_ref.cgi?rs=4410790)  |chr7 |17284577| coffee consumption        | [Cornelis MC et al. (2011) PLoS Genet.](https://www.ncbi.nlm.nih.gov/pubmed/?term=21490707) |

To simulate phenotype, we first encode genotypes to integers in the following manner: *homozygote* reference encoded as 0, *heterozygote* as 1 and *homozygote alternative* as 2. Then we compute the score for each sample using the following equation.

`Score = (5*B6*B2 – 4*B6 – 4*B2 + 1) + (7*R1*C2 – 6*R1 – 6*C2 + 1)`

Next, we add noise to the score where noise is a random number with normal distribution, `mean = 0` and `standard deviation = 6`. Finally, we sort samples by `score+noise` and take the first half of as controls and the second half as cases. By doing so, we created a binary annotation for the individuals.

In the rest of this notebook, we will demonstrate the usage of VariantSpark to reverse-engineer the association of the selected SNPs to the phenotype of interest (i.e. being a hipster). We also use traditional GWAS approach and its failure to identify significant SNPs when there are complex interactions.

### Load phenotypes data

In [None]:
table = hc.import_table('s3://variant-spark/datasets/Hipster2Small/Hipster.csv', delimiter=',', impute=True)\
.key_by('Sample')

# For details of important variant please refere to the following link
# https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8497971343024764/53198984527781/2559267461126367/latest.html
intervals = map(hail.representation.Interval.parse, ['2:223034081-223034083', '5:126626043-126626045', '4:54511912-54511914', "7:17284576-17284578"])


### Report the phenotype data

In [None]:
tpd = table.to_pandas()

In [None]:
tpd.head(5)

In [None]:
tpd.hist('Score')

In [None]:
tpd.hist('Noise')

In [None]:
tpd.groupby('isCase')['Sample'].count()

In [None]:
tpd.hist('Score+Noise')

### Load VCF file into Hail VDS format

In [None]:
vcf='s3://variant-spark/datasets/Hipster2Small/Small.558938.vcf.bgz'
rate=0.001
nTree=1000
    
if Demo==True:
    vcf='s3://variant-spark/datasets/Hipster2Small/Tiny.vcf.bgz'
    rate=0.05
    nTree=100


In [None]:
caseCtrlVds = hc.import_vcf(vcf).repartition(numPartition).cache()

### Count Number of samples and SNPs.

In [None]:
print(caseCtrlVds.count())

### Annotate dataset with phenotype

In [None]:
caseCtrlVds = caseCtrlVds.annotate_samples_table(table, root='sa.pheno')

### Compute PCA vectors

In [None]:
pca_kt = caseCtrlVds.sample_variants(rate).cache()\
.pca(k=2, scores='sa.pca_score', loadings='va.pca_loadings', eigenvalues='global.pca_evals')\
.samples_table()

pca_table = pca_kt.to_pandas()

colors = {'Control': 'green', 'Case': 'red'}
plt.figure(figsize=(15,10))
plt.scatter(pca_table["sa.pca_score.PC1"], pca_table["sa.pca_score.PC2"],
            c = pca_table["sa.pheno.population"].map(colors),
            alpha = .5)
plt.xlim(pca_table["sa.pca_score.PC1"].min(), pca_table["sa.pca_score.PC1"].max())
plt.ylim(pca_table["sa.pca_score.PC2"].min(), pca_table["sa.pca_score.PC2"].max())
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.title("PCA")
legend_entries = [mpatches.Patch(color=c, label=pheno) for pheno, c in colors.items()]
plt.legend(handles=legend_entries, loc=2)
plt.show()

caseCtrlVds = caseCtrlVds.annotate_samples_table(pca_kt, root='sa.pca')

del pca_kt
del pca_table

### GWAS using VariantSpark (1000 Trees)

In [None]:
%%sh
date +%s > time.txt

In [None]:
vskt = caseCtrlVds.importance_analysis("sa.pheno.Hipster", n_trees = nTree, mtry_fraction = 0.1, oob = False,\
                                       seed = 13L,  batch_size = 100)\
.important_variants(1000000)\
.rename({'variant':'v'}).rename({'importance':'vsis'})\
.order_by(desc('vsis')).indexed('vs_rank').key_by('v')

### Annotate Hail VDS dataset with "Importance Score" computed using VariantSpark

In [None]:
caseCtrlVds = caseCtrlVds.annotate_variants_table(vskt, root='va.vsis')

### Manhattan plot for VariantSpark "Importance Score" (z-score)

In [None]:
Important_var = caseCtrlVds.filter_intervals(intervals)

pdf = caseCtrlVds.filter_variants_expr('va.vsis.vs_rank < 1000', keep=True)\
.union(Important_var).deduplicate().variants_table()\
.expand_types().flatten()\
.rename({'va.vsis.vsis':'vsis'})\
.rename({'v.contig':'chr'})\
.rename({'v.start':'pos'})\
.key_by(['vsis'])\
.select(['vsis', 'pos', 'chr'])\
.annotate('jk = chr + "_" + pos').to_pandas()

key='zvsis'

pdf[key] = (pdf['vsis'] - pdf['vsis'].mean())/pdf['vsis'].std(ddof=0)

pdf['xpos'] = pdf.apply(lambda row: ofs[int(row['chr'])] + row['pos'], axis=1)
pdf = ColorImportant(pdf)
pdf2 = pdf.loc[pdf['Important'] == True]

plt.figure(figsize=(15,10))
plt.scatter(pdf['xpos'], pdf[key], c = pdf['c'])
plt.scatter(pdf2['xpos'], pdf2[key], c = pdf2['c'], linewidths=5)
plt.xlim(pdf['xpos'].min(), pdf['xpos'].max())
plt.ylim(0, pdf[key].max()*1.1)
plt.xlabel("chromosomes")
plt.ylabel('-log10(p-value)')
plt.title("Manhattan Plot of "+ key)
#legend_entries = [mpatches.Patch(color=c, label=pheno) for pheno, c in colors.items()]
#plt.legend(handles=legend_entries, loc=2)
plt.show()

In [None]:
%%sh
start=`cat time.txt`
end=`date +%s`
runtime=$((end-start))
echo "VriantSpark runtime is "$runtime "seconds"

### Traditional GWAS using Logestic Regression (wald test)

In [None]:
%%sh
date +%s > time.txt

In [None]:
caseCtrlVds = caseCtrlVds\
.logreg(test='wald' , root='va.wald' , y='sa.pheno.isCase', \
        covariates=['sa.pca.pca_score.PC1', 'sa.pca.pca_score.PC2'])


### Manhattan plot for traditional GWAS with highlighted important SNPs

In [None]:
Important_var = caseCtrlVds.filter_intervals(intervals)

pdf = caseCtrlVds.filter_variants_expr('va.wald.pval < 0.01', keep=True)\
.union(Important_var).deduplicate().variants_table()\
.expand_types().flatten()\
.rename({'va.wald.pval':'wald_pval'})\
.rename({'v.contig':'chr'})\
.rename({'v.start':'pos'})\
.key_by(['wald_pval'])\
.select(['wald_pval', 'pos', 'chr'])\
.annotate('wald_log10 = -log10(wald_pval)')\
.annotate('jk = chr + "_" + pos').to_pandas()

key='wald_log10'

pdf['xpos'] = pdf.apply(lambda row: ofs[int(row['chr'])] + row['pos'], axis=1)
pdf = ColorImportant(pdf)
pdf2 = pdf.loc[pdf['Important'] == True]

plt.figure(figsize=(15,10))
plt.scatter(pdf['xpos'], pdf[key], c = pdf['c'])
plt.scatter(pdf2['xpos'], pdf2[key], c = pdf2['c'], linewidths=5)
plt.xlim(pdf['xpos'].min(), pdf['xpos'].max())
plt.ylim(0, pdf[key].max()*1.1)
plt.xlabel("chromosomes")
plt.ylabel('-log10(p-value)')
plt.title("Manhattan Plot of "+ key)
#legend_entries = [mpatches.Patch(color=c, label=pheno) for pheno, c in colors.items()]
#plt.legend(handles=legend_entries, loc=2)
plt.show()

In [None]:
%%sh
start=`cat time.txt`
end=`date +%s`
runtime=$((end-start))
echo "Traditional GWAS runtime is "$runtime "seconds"