# Hail Tutorial - Student Edition

### Acquring and Installing the Resources
Firstly, we need to acquire the resources necessary to run Hail. Click the two links below to download the Hail jar and python files to your local system.

* https://github.com/mptrepanier/spark-saturday-advanced/blob/master/resources/hail-2.1.1.jar?raw=true

* https://github.com/mptrepanier/spark-saturday-advanced/blob/master/resources/pyhail.zip?raw=true

Next, we need add these libraries to our cluster. In order to do so:

1) Click the "Workspace" button on the left.

2) On the top of the tab that opens up, click the "Workspace" dropdown.

3) Select the "Import" option from the dropdown.

4) At the bottom of the window that opens, click "click here" in "To import a library, such as a jar or egg, click here."

5) Drag and drop the the Hail jar from your local filespace to the appropriate place in the location. After doing so, select "Create Library."

6) IMPORTANT: Click the link to the jar file at the top of your screen - copy the full jar file name. You will need this later.

7) Repeat step five for the pyhail.egg - there is no need to copy the name.

###Getting the Cluster Up and Running

Hail requires some additional installation steps in order to get up and running. Specfically, we need to ensure the Hail jar is visible across the cluster. The below script creates a databricks init script which provides this visibility. In order to run it, please create a cluster (any, we're just using it to call the below script) and then run the below cell.

In [4]:
dbutils.fs.put("dbfs:/databricks/init/install_hail.sh", """
#!/bin/bash
JAR_NAME="" # Place your JAR name here.
cp /dbfs/FileStore/jars/${JAR_NAME} /mnt/driver-daemon/jars/hail-2_1_1.jar
cp /dbfs/FileStore/jars/${JAR_NAME} /mnt/jars/driver-daemon/hail-2_1_1.jar
""",
True
)

Next, terminate the tempory cluster you just created. Create a new cluster of type `Spark 2.1 (Auto-updating, Scala 2.11) `. As well, when launching the cluster, please copy the below Spark configuration options into the Spark Tab at the bottom.

```
spark.hadoop.io.compression.codecs org.apache.hadoop.io.compress.DefaultCodec,is.hail.io.compress.BGzipCodec,org.apache.hadoop.io.compress.GzipCodec
spark.sql.files.openCostInBytes 1099511627776
spark.sql.files.maxPartitionBytes 1099511627776
spark.hadoop.mapreduce.input.fileinputformat.split.minsize 1099511627776
spark.hadoop.parquet.block.size 1099511627776
spark.driver.extraClassPath ./hail-2_1_1.jar
spark.executor.extraClassPath ./hail-2_1_1.jar
```

### An Introduction to Hail

Tutorial pulled from: https://hail.is/docs/stable/tutorials/hail-overview.html

We begin with some imports.

In [7]:
from hail import *
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from collections import Counter
from math import log, isnan
from pprint import pprint
%matplotlib inline

Each notebook starts the same: creating a `HailContext`. The HailContext serves as an analog to Spark's `SparkContext`, and can take a SparkContext as an input.
In the Databricks environment, developers should utilize the already existing SparkContext `sc` and pass it into the `HailContext` method.

In [9]:
hc = HailContext(sc)

Before we can read the variant dataset `vds` into Hail/Spark, we need to ensure that the data is accessible on our cluster. The below snippet checks locally for the presence of the variant dataset and fetches it if it is absent. The last three lines place the data in the databricks filestore - a cluster accessible file location.

In [11]:
import os
if os.path.isdir('/tmp/data/1kg.vds') and os.path.isfile('/tmp/data/1kg_annotations.txt'):
    print('All files are present and accounted for!')
else:
    import sys
    sys.stderr.write('Downloading data (~50M) from Google Storage...\n')
    import urllib
    import tarfile
    urllib.urlretrieve('https://storage.googleapis.com/hail-1kg/tutorial_data.tar',
                       '/tmp/tutorial_data.tar')
    sys.stderr.write('Download finished!\n')
    sys.stderr.write('Extracting...\n')
    tarfile.open('/tmp/tutorial_data.tar').extractall('/tmp')
    if not (os.path.isdir('/tmp/data/1kg.vds') and os.path.isfile('/tmp/data/1kg_annotations.txt')):
        raise RuntimeError('Something went wrong!')
    else:
        sys.stderr.write('Done!\n')
        
dbutils.fs.mkdirs('/FileStore/hail-data')
dbutils.fs.cp('file:/tmp/data/1kg_annotations.txt', '/FileStore/hail-data/1kg_annotations.txt')
dbutils.fs.cp('file:/tmp/data/1kg.vds', '/FileStore/hail-data/1kg.vds', recurse=True)
        

With the data present in our FileStore at `/FileStore/hail-data/1kg.vds`, we can go ahead and read it in. Call `vds.summarize().report()` to view some summary statistics about the variant dataset.

In [13]:
vds = hc.read("") # Enter in the file name.
vds.summarize().report()

The query_variants method is the first time we’ll see the Hail expression language. The expression language allows for a variety of incredibly expressive queries and computations, but is probably the most complex part of Hail. Try entering `'variants.take(5)'` in the method call and running the cell.

In [15]:
vds.query_variants('') # Enter the query description here to view the first 5 variants.

There are often several ways to do something in Hail. Here are two ways to peek at the first few sample IDs:

In [17]:
vds.query_samples('samples.take(5)')

In [18]:
vds.sample_ids[:5]

There’s a similar interface for looking at the genotypes in a dataset. We use `query_genotypes` to look at the first few genotype calls.

In [20]:
vds.query_genotypes('gs.take(5)')

## Integrate Sample Annotations

The annotations file can be imported into Hail with HailContext.import_table. This method produces a KeyTable object. Think of this as a Pandas or R dataframe that isn’t limited by the memory on your machine – behind the scenes, it’s distributed with Spark.

In [23]:
table = hc.import_table('/FileStore/hail-data/1kg_annotations.txt', impute=True) \
    .key_by('Sample')

A good way to peek at the structure of a `KeyTable` is to look at its `schema.`

In [25]:
print(table.schema)

The Python pprint method makes illegible printouts pretty:

In [27]:
pprint(table.schema)

Now we’ll use this table to add sample annotations to our dataset. First, we’ll print the schema of the sample annotations already there:

In [29]:
pprint(vds.sample_schema)

We use the annotate_samples_table method to join the table with the VDS.

In [31]:
vds = vds.annotate_samples_table(table, root='sa')
pprint(vds.sample_schema)

## Query Functions and the Hail Expression Language

We start by looking at some statistics of the information in our table. The query method uses the expression language to aggregate over the rows of the table.

`counter` is an aggregation function that counts the number of occurrences of each unique element. We can use this to pull out the population distribution.

In [34]:
pprint(table.query('SuperPopulation.counter()'))

`stats` is an aggregation function that produces some useful statistics about numeric collections. We can use this to see the distribution of the CaffeineConsumption phenotype.

In [36]:
pprint(table.query('CaffeineConsumption.stats()'))

However, these metrics aren’t perfectly representative of the samples in our dataset. Here’s why:

In [38]:
vds.query_samples('samples.map(s => sa.SuperPopulation).counter()')

In [39]:
pprint(vds.query_samples('samples.map(s => sa.CaffeineConsumption).stats()'))

The functionality demonstrated in the last few cells isn’t anything especially new: it’s certainly not difficult to ask these questions with Pandas or R dataframes, or even Unix tools like awk. But Hail can use the same interfaces and query language to analyze collections that are much larger, like the set of variants.

Here we calculate the counts of each of the 12 possible unique SNPs (4 choices for the reference base * 3 choices for the alternate base). To do this, we need to map the variants to their alternate allele, filter to SNPs, and count by unique ref/alt pair:

In [41]:
snp_counts = vds.query_variants('variants.map(v => v.altAllele()).filter(aa => aa.isSNP()).counter()')
pprint(Counter(snp_counts).most_common())

It’s nice to see that we can actually uncover something biological from this small dataset: we see that these frequencies come in pairs. C/T and G/A are actually the same mutation, just viewed from from opposite strands. Likewise, T/A and A/T are the same mutation on opposite strands. There’s a 30x difference between the frequency of C/T and A/T SNPs. Why?

The same Python, R, and Unix tools could do this work as well, but we’re starting to hit a wall - the latest gnomAD release publishes about 250 million variants, and that won’t fit in memory on a single computer.

What about genotypes? Hail can query the collection of all genotypes in the dataset, and this is getting large even for our tiny dataset. Our 1,000 samples and 10,000 variants produce 10 million unique genotypes. The gnomAD dataset has about 5 trillion unique genotypes.

Here we will use the hist aggregator to produce and plot a histogram of DP values for genotypes in our thousand genomes dataset.

In [43]:
dp_hist = vds.query_genotypes('gs.map(g => g.dp).hist(0, 30, 30)')
fig, ax = plt.subplots()
plt.xlim(0, 31)
plt.bar(dp_hist.binEdges[1:], dp_hist.binFrequencies)
display(fig)

## Quality Control

QC is where analysts spend most of their time with sequencing datasets. QC is an iterative process, and is different for every project: there is no “push-button” solution for QC. Each time the Broad collects a new group of samples, it finds new batch effects. However, by practicing open science and discussing the QC process and decisions with others, we can establish a set of best practices as a community.

QC is entirely based on the ability to understand the properties of a dataset. Hail attempts to make this easier by providing the sample_qc method, which produces a set of useful metrics as sample annotations.

In [46]:
pprint(vds.sample_schema)

In [47]:
vds = vds.sample_qc()
pprint(vds.sample_schema)

Interoperability is a big part of Hail. vds have a method `samples_table()` which generates a table view of the sample annotations. Initially in parquet format, this table can be converted into a pandas dataframe by calling the `to_pandas()` method on it.

In [49]:
df = # Call the aforementioned methods on the variant dataset and subsequent
     # samples table to create a pandas dataframe.
df.head()

Plotting the QC metrics is a good place to start.

In [51]:
plt.clf()
plt.subplot(1, 2, 1)
plt.hist(df["sa.qc.callRate"], bins=np.arange(.75, 1.01, .01))
plt.xlabel("Call Rate")
plt.ylabel("Frequency")
plt.xlim(.75, 1)

plt.subplot(1, 2, 2)
plt.hist(df["sa.qc.gqMean"], bins = np.arange(0, 105, 5))
plt.xlabel("Mean Sample GQ")
plt.ylabel("Frequency")
plt.xlim(0, 105)

plt.tight_layout()
display(fig)

Often, these metrics are correlated.

In [53]:
plt.clf()
plt.scatter(df["sa.qc.dpMean"], df["sa.qc.callRate"],
            alpha=0.1)
plt.xlabel('Mean DP')
plt.ylabel('Call Rate')
plt.xlim(0, 20)
display(fig)

Removing outliers from the dataset will generally improve association results. We can draw lines on the above plot to indicate outlier cuts. We’ll want to remove all samples that fall in the bottom left quadrant.

In [55]:
plt.scatter(df["sa.qc.dpMean"], df["sa.qc.callRate"],
            alpha=0.1)
plt.xlabel('Mean DP')
plt.ylabel('Call Rate')
plt.xlim(0, 20)
plt.axhline(0.97, c='k')
plt.axvline(4, c='k')
display(fig)

It’s easy to filter when we’ve got the cutoff values decided:

In [57]:
vds = vds.filter_samples_expr('sa.qc.dpMean >= 4 && sa.qc.callRate >= 0.97')
print('After filter, %d/1000 samples remain.' % vds.num_samples)

Next is genotype QC. To start, we’ll print the post-sample-QC call rate. It’s actually gone up since the initial summary - dropping low-quality samples disproportionally removed missing genotypes.

In [59]:
call_rate = vds.query_genotypes('gs.fraction(g => g.isCalled)')
print('pre QC call rate is %.3f' % call_rate)

It’s a good idea to filter out genotypes where the reads aren’t where they should be: if we find a genotype called homozygous reference with >10% alternate reads, a genotype called homozygous alternate with >10% reference reads, or a genotype called heterozygote without a ref / alt balance near 1:1, it is likely to be an error.

In [61]:
filter_condition_ab = '''let ab = g.ad[1] / g.ad.sum() in
                         ((g.isHomRef && ab <= 0.1) ||
                          (g.isHet && ab >= 0.25 && ab <= 0.75) ||
                          (g.isHomVar && ab >= 0.9))'''
vds = vds.filter_genotypes() # Pass in the filter condition to the
                             # filter_genotypes() method.
post_qc_call_rate = vds.query_genotypes('gs.fraction(g => g.isCalled)')
print('post QC call rate is %.3f' % post_qc_call_rate)

Variant QC is a bit more of the same: we can use the variant_qc method to produce a variety of useful statistics, plot them, and filter.

In [63]:
pprint(vds.variant_schema)

The cache is used to optimize some of the downstream operations.

In [65]:
vds = vds.variant_qc().cache()
pprint(vds.variant_schema)

In [66]:
variant_df = vds.variants_table().to_pandas()

plt.clf()
plt.subplot(2, 2, 1)
variantgq_means = variant_df["va.qc.gqMean"]
plt.hist(variantgq_means, bins = np.arange(0, 84, 2))
plt.xlabel("Variant Mean GQ")
plt.ylabel("Frequency")
plt.xlim(0, 80)

plt.subplot(2, 2, 2)
variant_mleaf = variant_df["va.qc.AF"]
plt.hist(variant_mleaf, bins = np.arange(0, 1.05, .025))
plt.xlabel("Minor Allele Frequency")
plt.ylabel("Frequency")
plt.xlim(0, 1)

plt.subplot(2, 2, 3)
plt.hist(variant_df['va.qc.callRate'], bins = np.arange(0, 1.05, .01))
plt.xlabel("Variant Call Rate")
plt.ylabel("Frequency")
plt.xlim(.5, 1)

plt.subplot(2, 2, 4)
plt.hist(variant_df['va.qc.pHWE'], bins = np.arange(0, 1.05, .025))
plt.xlabel("Hardy-Weinberg Equilibrium p-value")
plt.ylabel("Frequency")
plt.xlim(0, 1)

plt.tight_layout()
display(fig)

These statistics actually look pretty good: we don’t need to filter this dataset. Most datasets require thoughtful quality control, though. The filter_variants_expr method can help!

## Let's do a GWAS!

First, we need to restrict to variants that are:

* common (we’ll use a cutoff of 1%)
* uncorrelated (not in linkage disequilibrium)

Both of these are easy in Hail.

As well, use the `count()` method tally how many sites we have remaining in `common_vds`.

In [70]:
common_vds = (vds
              .filter_variants_expr('va.qc.AF > 0.01')
              .ld_prune(memory_per_core=256, num_cores=4))

count = # Apply the count method to common_vds().


These filters removed about 15% of sites (we started with a bit over 10,000). This is NOT representative of most sequencing datasets! We have already downsampled the full thousand genomes dataset to include more common variants than we’d expect by chance.

In Hail, the association tests accept sample annotations for the sample phenotype and covariates. Since we’ve already got our phenotype of interest (caffeine consumption) in the dataset, we are good to go:

In [72]:
gwas = common_vds.linreg('sa.CaffeineConsumption')
pprint(gwas.variant_schema)

Looking at the bottom of the above printout, you can see the linear regression adds new variant annotations for the beta, standard error, t-statistic, and p-value.

In [74]:
def qqplot(pvals, xMax, yMax):
    spvals = sorted(filter(lambda x: x and not(isnan(x)), pvals))
    exp = [-log(float(i) / len(spvals), 10) for i in np.arange(1, len(spvals) + 1, 1)]
    obs = [-log(p, 10) for p in spvals]
    fig, ax = plt.subplots()
    plt.clf()
    plt.scatter(exp, obs)
    plt.plot(np.arange(0, max(xMax, yMax)), c="red")
    plt.xlabel("Expected p-value (-log10 scale)")
    plt.ylabel("Observed p-value (-log10 scale)")
    plt.xlim(0, xMax)
    plt.ylim(0, yMax)
    display(fig)

Python makes it easy to make a Q-Q (quantile-quantile) plot.

In [76]:
qqplot(gwas.query_variants('variants.map(v => va.linreg.pval).collect()'),
       5, 6)

## Confounded!

The observed p-values drift away from the expectation immediately. Either every SNP in our dataset is causally linked to caffeine consumption (unlikely), or there’s a confounder.

We didn’t tell you, but sample ancestry was actually used to simulate this phenotype. This leads to a stratified distribution of the phenotype. The solution is to include ancestry as a covariate in our regression.

The linreg method can also take sample annotations to use as covariates. We already annotated our samples with reported ancestry, but it is good to be skeptical of these labels due to human error. Genomes don’t have that problem! Instead of using reported ancestry, we will use genetic ancestry by including computed principal components in our model.

The pca method produces sample PCs in sample annotations, and can also produce variant loadings and global eigenvalues when asked.

In [79]:
pca = common_vds.pca('sa.pca', k=5, eigenvalues='global.eigen')
pprint(pca.globals)

In [80]:
pprint(pca.sample_schema)

Now that we’ve got principal components per sample, we may as well plot them! Human history exerts a strong effect in genetic datasets. Even with a 50MB sequencing dataset, we can recover the major human populations.

In [82]:
plt.clf()
fig, ax = plt.subplots()
pca_table = pca.samples_table().to_pandas()
colors = {'AFR': 'green', 'AMR': 'red', 'EAS': 'black', 'EUR': 'blue', 'SAS': 'cyan'}
plt.scatter(pca_table["sa.pca.PC1"], pca_table["sa.pca.PC2"],
            c = pca_table["sa.SuperPopulation"].map(colors),
            alpha = .5)
plt.xlim(-0.6, 0.6)
plt.xlabel("PC1")
plt.ylabel("PC2")
legend_entries = [mpatches.Patch(color=c, label=pheno) for pheno, c in colors.items()]
plt.legend(handles=legend_entries, loc=2)
display(fig)

Now we can rerun our linear regression, controlling for the first few principal components and sample sex.

In [84]:
pvals = (common_vds
        .annotate_samples_table(pca.samples_table(), expr='sa.pca = table.pca')
        .linreg('sa.CaffeineConsumption',
                covariates=['sa.pca.PC1', 'sa.pca.PC2', 'sa.pca.PC3', 'sa.isFemale'],
                use_dosages=True)
        .query_variants('variants.map(v => va.linreg.pval).collect()'))
qqplot(pvals, 5, 6)

That’s more like it! We may not be publishing ten new coffee-drinking loci in Nature, but we shouldn’t expect to find anything but the strongest signals from a dataset of 1000 individuals anyway.

## Rare Variant Analysis

Hail doesn’t yet have rare variant kernel-based methods, but we have linear and logistic burden tests.

We won’t be showing those here, though. Instead, we’ll demonstrate how one can use the expression language to group and count by any arbitrary properties in variant or sample annotatio

In [88]:
kt = (vds.genotypes_table()
         .aggregate_by_key(key_expr=['pop = sa.SuperPopulation', 'chromosome = v.contig'],
                           agg_expr=['n_het = g.filter(g => g.isHet()).count()']))
kt.to_dataframe().show()

What if we want to group by minor allele frequency bin and hair color, and calculate the mean GQ?

We can convert the keytable to a basic Spark DataFrame using `to_dataframe()` and then call the DataFrame's `show()` method to view the top rows.

In [90]:
kt2 = (vds.genotypes_table()
          .aggregate_by_key(key_expr=['''maf_bin = if (va.qc.AF < 0.01) "< 1%"
                                                   else if (va.qc.AF < 0.05) "1%-5%"
                                                   else "> 5%" ''',
                                     'purple_hair = sa.PurpleHair'],
                           agg_expr=['mean_gq = g.map(g => g.gq).stats().mean',
                                     'mean_dp = g.map(g => g.dp).stats().mean']))

df = # Convert kt2 to a Spark DataFrame and call the show method to view
     # its top rows.

We’ve shown that it’s easy to aggregate by a couple of arbitrary statistics. This specific examples may not provide especially useful pieces of information, but this same pattern can be used to detect effects of rare variation:

Count the number of heterozygous genotypes per gene by functional category (synonymous, missense, or loss-of-function) to estimate per-gene functional constraint
Count the number of singleton loss-of-function mutations per gene in cases and controls to detect genes involved in disease