In [None]:
import os
from glob import glob
from tqdm import tqdm
import datetime
import hail as hl

from hail.plot import show
import pandas as pd
from pprint import pprint
hl.plot.output_notebook()

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

### Spark instance: Read data from Minio bucket

In [None]:
log_file_name = f"logs/hail-{datetime.datetime.now():%Y-%m-%d-%H-%M-%S}.log"
# run spark
spark_conf = SparkConf().setAppName("hail-test")
# .setMaster("spark://spark-master:7077")
spark_conf.set("spark.hadoop.fs.s3a.endpoint", "http://lifemap-minio:9000/")
spark_conf.set("spark.hadoop.fs.s3a.access.key", "root")
spark_conf.set("spark.hadoop.fs.s3a.secret.key", "passpass" )
spark_conf.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
spark_conf.set("spark.hadoop.fs.s3a.path.style.access", "true")
spark_conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark_conf.set("spark.hadoop.fs.s3a.connection.maximum", 1024);
spark_conf.set("spark.hadoop.fs.s3a.threads.max", 1024);
spark_conf.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc = SparkContext(conf=spark_conf)

In [None]:
#### create bucket if it does not exists

In [None]:
import boto3
from botocore.exceptions import NoCredentialsError

# S3 configuration
s3 = boto3.client(
    's3',
    endpoint_url="http://lifemap-minio:9000",
    aws_access_key_id="root",
    aws_secret_access_key="passpass",
)

bucket_name = "data-hail"

# Check if the bucket exists, if not, create it
try:
    s3.head_bucket(Bucket=bucket_name)
    print(f"Bucket '{bucket_name}' exists.")
except Exception:
    # If the bucket does not exist, create it
    s3.create_bucket(Bucket=bucket_name)
    print(f"Bucket '{bucket_name}' created.")


### Init Hail

In [None]:
hl.init(sc=sc, log=log_file_name)

#### Import and matrix table filenames

In [None]:
# test read
vcf_fn = 'data/1kg.vcf'
annotations_fn = 'data/1kg_annotations.txt'
mt_fn = 's3://data-hail/1kg.mt'

print (f"Input fn: {vcf_fn}")
print (f"Matrix table fn: {mt_fn}")

#### Reading vcf with Pandas (N/A if the vcf is stored on s3)

In [None]:
vcf_pd = None
if "s3://" not in vcf_fn: 
    vcf_pd = pd.read_csv(vcf_fn, sep="\t", header=109)
vcf_pd

### Import VCF to hail as Matrix Table

In [None]:
## Read a vcf file and write it as matrix table
_ = hl.import_vcf(vcf_fn).write(mt_fn, overwrite=True) # assign this to a dummy variable to avoid errors

In [None]:
## Read the matrix table from the file and assign it to the mt vaiable
mt = hl.read_matrix_table(mt_fn)

In [None]:
## Summary of the table. This shows all matrix table component:
# 1) global fields
# 2) column fields (they are rows of the column table)
# 3) row fields (the columns of the row table)
# 3) entry fields (the columns of the entry table)
mt.describe()

## Counts of samples and variants
print (" Printing row and column fields with different calls")
# count row fields
print(mt.count_rows())
# same as
print (mt.rows().count())

# count column fields
print(mt.count_cols())
# same as
print(mt.cols().count())

n_variants = mt.count_rows() # This can be done accessing directy the row table with mt.rows().count()
n_samples = mt.count_cols()  # This can be done accessing directy the column table with mt.cols().count()

print (f"\n\nTable has {n_variants} variants and {n_samples} samples") 

#### Row table:
Contains all vcf columns except FORMAT and samples. Each row data is constant for the columns (samples) of the dataset 

In [None]:
mt.rows().show()

#### Column table
Contains sample ids as rows. It can have also columns that represents data fields specific for each sample, so constant for each row of the dataset 

In [None]:
mt.cols().show()

#### Entry table.
The entry table has a specific value for the pair (variant, sample) for the fieds specified in the vcf format column.   

In [None]:
mt.entries().show()

#### Global values.
Common values of the matrix table

In [None]:
mt.globals_table().show()

#### Getting to know our data

In this case the matrix array data structure does not have global fields but has got column and row fields. 

- row fields: each row specifies a variants and row fields are informations common to that variants (for examples alleles or quality).
- column fields: each row is specific for a sample and, in this case, specifies the sample ID
- entry fields: they are specific data for a pair (variant, sample), for example the genotype read for that particular sample.
- column key: it is the column field used for join operations
- row key: it is the row field used for join operation 

In [None]:
## mt.rows() gets the row table. Using select() without arguments returns the table key columns as specified in the documentation:
## Select methods will always preserve the key along that axis; e.g. for Table.select(), the table key will aways be kept. To modify the key, use key_by().
mt.rows().select().show()

In [None]:
# The same can be done by using the matrix table select_row method directly. 
mt.select_rows().show()

#### Show attributes of entry fields. An example with the genotype field

In [None]:
gt_expr = mt.GT # Takes the GT entry field for all samples 
gt_expr.phased.show() # Show the phased attribute of the GT field (It is False for not phased haplotypes)

In [None]:
gt_expr.ploidy.show()

In [None]:
gt_expr.summarize()

#### The entries method

In [None]:
entry_structure = mt.entry
# To show all entry field names
list(entry_structure)

In [None]:
# entries method returns a matrix in coordinate table form (It is a hail table). Take a look to the API for typical applications
# Rouhgly speaking, entries makes a flattening version of the data structure and allows to perform 
# grouping on rows and cols simultaneously 
mt_entries = mt.entries()
mt_entries.describe()

In [None]:
## The first n_sample rows deal with the info of the first variant for each sample.
## Here first 10 entries. It can be seen from the column locus (it has the same value) and the
## s column that changes to specify the sample ID.
## The n_samples + 1 row shows the next variant for the first sample ID.
mt_entries.show(n_samples + 1)

#### Showing rows, cols and entry field data

It’s important to have easy ways to slice, dice, query, and summarize a dataset. Some of this functionality is demonstrated below.

The rows method can be used to get a table with all the row fields in our MatrixTable.

We can use rows along with select to pull out 5 variants. The select method takes either a string refering to a field name in the table, or a Hail Expression. Here, we leave the arguments blank to keep only the row key fields, locus and alleles.

Use the show method to display the variants.

In [None]:
mt.rows().select().show(5)

In [None]:
## the same with 
mt.row_key.show(5)

In [None]:
## The matrix table has also the colum dimension with the sample IDS
## To peek at the first few sample IDs
## s is the ID field

mt.s.show()

To look at the first few genotype calls, we can use entries along with select and take. The **take method collects the first n rows into a list**. Alternatively, we can use the show method, which prints the first n rows to the console in a table format.

**Enrty method returns the list of all entry fields sequentially**. If the number of samples is M, taking M+1 entry elements returns the first variant of all samples plus the second variant of the first sample.
Try changing take to show in the cell below.

In [None]:
# Take takes the first 10 entries 
mt.entry.take(10)

In [None]:
# Show the first 10 variants of the first sample
mt.entry.show(10)

### Hail Table object: Adding column fields to the matrix table starting from a metadata Table (annotations)

Metadata of samples (like phenotypes or geographical origin) are saved in a separate txt file imported to Hail as a Table.

A Hail MatrixTable can have any number of row fields and column fields for storing data associated with each row and column. Annotations are usually a critical part of any genetic study. **Column fields are where you’ll store information about sample phenotypes, ancestry, sex, and covariates. Row fields can be used to store information like gene membership and functional impact for use in QC or analysis**.

In this tutorial, we demonstrate how to take a text file and use it to annotate the columns in a MatrixTable.

The file provided contains the sample ID, the population and “super-population” designations, the sample sex, and two simulated phenotypes (one binary, one discrete).

This file can be imported into Hail with **import_table**. This function produces a **Table object**. Think of this as a Pandas or R dataframe that isn’t limited by the memory on your machine – behind the scenes, it’s distributed with Spark. **The Table object, like the matrix table object, is immutable**. To interact with it locally as a Python datastructure, you should use the **take** method or transform to a Pandas dataframe.

**Table can have global field and row fields**. Global field is common for each element of the table whereas row fields are specific for each row. In this case, each row refers specific attributes of a sample. For example, its ID is specified in the "Sample" row field whereas the other fields show its metadata. 

In [None]:
annotation_table = (hl.import_table(annotations_fn, impute=True)
         .key_by('Sample'))

annotation_table.describe()

In [None]:
annotation_table.show()

#### Query functions and the Hail Expression Language

Hail has a number of useful query functions that can be used for gathering statistics on our dataset. These query functions take Hail Expressions as arguments.

We will start by looking at some statistics of the information in our table. The **aggregate** method can be used to aggregate over rows of the table.

**counter** is an aggregation function that counts the number of occurrences of each unique element. We can use this to pull out the population distribution by passing in a Hail Expression for the field that we want to count by.

The aggregate method is then used to aggregate something in the table across different rows and the aggregate function like counter, stats, etc... are used to specify how and what to aggregate.

In [None]:
## Population distribution
## Here counter counts unique geographycal origin label

aggregate_expression = hl.agg.counter(annotation_table.SuperPopulation)
pprint(annotation_table.aggregate(aggregate_expression))

**stats** is an aggregation function that produces some useful statistics about numeric collections. We can use this to see the distribution of the CaffeineConsumption phenotype.

In [None]:
## Stats perform some statistics on the specified field
## Here take stats of the caffeine consumption

aggregate_expression = hl.agg.stats(annotation_table.CaffeineConsumption)
pprint(annotation_table.aggregate(aggregate_expression))

The functionality demonstrated in the last few cells isn’t anything especially new: **it’s certainly not difficult to ask these questions with Pandas or R dataframes, or even Unix tools like awk. But Hail can use the same interfaces and query language to analyze collections that are much larger, like the set of variants.**

### Join sample annotations with the matrix table

Using the **annotate_cols** method is possible to join the annotation table with the MatrixTable containing our dataset.
 First, we’ll print the existing column schema:

In [None]:
# Column table before adding per sample annotation
mt.col.describe()

In [None]:
mt = mt.annotate_cols(pheno = annotation_table[mt.s])

# After the annotation the columns has a new field pheno,
# a struct that contains sample metadata
mt.col.describe()

Each column "name" now specifies the sample ID and its phenotype. Values for each column are still the entry field values (for example Genotyping values (GT))  

Here we calculate the counts of each of the 12 possible unique SNPs (4 choices for the reference base * 3 choices for the alternate base).

To do this, we need to get the alternate allele of each variant and then count the occurences of each unique ref/alt pair. This can be done with Hail’s **counter function**.

In [None]:
mt.alleles.show()

In [None]:
struct_expr = hl.Struct(ref=mt.alleles[0], alt=mt.alleles[1])
struct_expr.get('alt').take(10)

In [None]:
struct_expr = hl.Struct(ref=mt.alleles[0], alt=mt.alleles[1])
snp_counts = mt.aggregate_rows(hl.agg.counter(struct_expr))
pprint(snp_counts)

# We can list the counts in descending order using Python’s Counter class.
from collections import Counter
counts = Counter(snp_counts)
counts.most_common()

**The same Python, R, and Unix tools could do this work as well, but we’re starting to hit a wall - the latest gnomAD release publishes about 250 million variants, and that won’t fit in memory on a single computer.**

What about genotypes? Hail can query the collection of all genotypes in the dataset, and this is getting large even for our tiny dataset. Our 284 samples and 10,000 variants produce 10 million unique genotypes. The gnomAD dataset has about 5 trillion unique genotypes.

Hail plotting functions allow Hail fields as arguments, so we can pass in the DP field directly here. If the range and bins arguments are not set, this function will compute the range based on minimum and maximum values of the field and use the default 50 bins.

In [None]:
p = hl.plot.histogram(mt.DP, range=(0,30), bins=30, title='DP Histogram', legend='DP')
show(p)

### Quality control

QC is where analysts spend most of their time with sequencing datasets. QC is an iterative process, and is different for every project: there is no “push-button” solution for QC. Each time the Broad collects a new group of samples, it finds new batch effects. However, by practicing open science and discussing the QC process and decisions with others, we can establish a set of best practices as a community.

QC is entirely based on the ability to understand the properties of a dataset. Hail attempts to make this easier by providing the sample_qc function, which produces a set of useful metrics and stores them in a column field.

In [None]:
mt.col.describe()

In [None]:
# sample_qc is a hail genetic method to compute per-sample metrics useful for quality control.
mt = hl.sample_qc(mt)
mt.col.describe()

In [None]:
##Plotting the QC metrics is a good place to start.

p = hl.plot.histogram(mt.sample_qc.call_rate, range=(.88,1), legend='Call Rate')
show(p)

In [None]:
p = hl.plot.histogram(mt.sample_qc.n_not_called, legend='Not called')
show(p)

In [None]:
p = hl.plot.histogram(mt.sample_qc.gq_stats.mean, range=(10,70), legend='Mean Sample GQ')
show(p)

In [None]:
## Checking corralations between the mean value of dp and the call rate
p = hl.plot.scatter(mt.sample_qc.dp_stats.mean, mt.sample_qc.call_rate, xlabel='Mean DP', ylabel='Call Rate')
show(p)

#### Removing outliers

Removing outliers from the dataset will generally improve association results. We can make arbitrary cutoffs and use them to filter:

In [None]:
## It creates a new matrix table considering samples with the DP mean >= 4 and a call rate >= 0.97
## samples that don't satisfy these criteria are removed 

mt = mt.filter_cols((mt.sample_qc.dp_stats.mean >= 4) & (mt.sample_qc.call_rate >= 0.97))
print('After filter, %d/284 samples remain.' % mt.count_cols())

Next is genotype QC. It’s a good idea to filter out genotypes where the reads aren’t where they should be: if we find a genotype called homozygous reference with >10% alternate reads, a genotype called homozygous alternate with >10% reference reads, or a genotype called heterozygote without a ref / alt balance near 1:1, it is likely to be an error.

In a low-depth dataset like 1KG, it is hard to detect bad genotypes using this metric, since a read ratio of 1 alt to 10 reference can easily be explained by binomial sampling. However, in a high-depth dataset, a read ratio of 10:100 is a sure cause for concern!



In [None]:
ab = mt.AD[1] / hl.sum(mt.AD)

filter_condition_ab = ((mt.GT.is_hom_ref() & (ab <= 0.1)) |
                        (mt.GT.is_het() & (ab >= 0.25) & (ab <= 0.75)) |
                        (mt.GT.is_hom_var() & (ab >= 0.9)))

fraction_filtered = mt.aggregate_entries(hl.agg.fraction(~filter_condition_ab))
print(f'Filtering {fraction_filtered * 100:.2f}% entries out of downstream analysis.')
mt = mt.filter_entries(filter_condition_ab)

Variant QC computes per per-variant metric useful for quality control. It is a bit more of the same of sample_qc: we can use the variant_qc function to produce a variety of useful statistics, plot them, and filter. This is made at row level beacause they are stats on variants.

These statistics actually look pretty good: we don’t need to filter this dataset. Most datasets require thoughtful quality control, though. The filter_rows method can help!

In [None]:
mt = hl.variant_qc(mt)
mt.row.describe()

In [None]:
p = hl.plot.histogram(mt.variant_qc.call_rate, legend='Variant QC call rate')
show(p)

In [None]:
p = hl.plot.histogram(mt.variant_qc.dp_stats.mean, legend='Variant QC DP')
show(p)

## Let’s do a GWAS!

First, we need to restrict to variants that are :

- common (we’ll use a cutoff of 1%)
- not so far from Hardy-Weinberg equilibrium as to suggest sequencing error

In [None]:
mt = mt.filter_rows(mt.variant_qc.AF[1] > 0.01) # It takes variants for which the alternate allele has a frequency larger than 1%
print('Samples: %d  Variants: %d' % (mt.count_cols(), mt.count_rows()))

mt = mt.filter_rows(mt.variant_qc.p_value_hwe > 1e-6) # Hardy-Weinberg equilibrium pvalue cut-off
print('Samples: %d  Variants: %d' % (mt.count_cols(), mt.count_rows()))

These filters removed about 15% of sites (we started with a bit over 10,000). This is NOT representative of most sequencing datasets! We have already downsampled the full thousand genomes dataset to include more common variants than we’d expect by chance.

In Hail, the association tests accept column fields for the sample phenotype and covariates. Since we’ve already got our phenotype of interest (caffeine consumption) in the dataset, we are good to go:

In [None]:
gwas = hl.linear_regression_rows(y=mt.pheno.CaffeineConsumption,
                                 x=mt.GT.n_alt_alleles(),
                                 covariates=[1.0])
gwas.row.describe()

Looking at the bottom of the above printout, you can see the linear regression adds new row fields for the beta, standard error, t-statistic, and p-value.

Hail makes it easy to visualize results! Let’s make a Manhattan plot:

In [None]:
p = hl.plot.manhattan(gwas.p_value)
show(p)

This doesn’t look like much of a skyline. Let’s check whether our GWAS was well controlled using a Q-Q (quantile-quantile) plot.

In [None]:
p = hl.plot.qq(gwas.p_value)
show(p)

## Confounded!

The observed p-values drift away from the expectation immediately. Either every SNP in our dataset is causally linked to caffeine consumption (unlikely), or there’s a confounder.

We didn’t tell you, but sample ancestry was actually used to simulate this phenotype. This leads to a **stratified distribution** of the phenotype. The solution is to include ancestry as a covariate in our regression.

The **linear_regression_rows** function can also take column fields to use as covariates. We already annotated our samples with reported ancestry, but it is good to be skeptical of these labels due to human error. Genomes don’t have that problem! Instead of using reported ancestry, we will use genetic ancestry by including computed principal components in our model.

The **pca** function produces eigenvalues as a list and sample PCs as a Table, and can also produce variant loadings when asked. The **hwe_normalized_pca** function does the same, using HWE-normalized genotypes for the PCA.

In [None]:
mt.GT.show()

In [None]:
eigenvalues, pcs, _ = hl.hwe_normalized_pca(mt.GT)

In [None]:
pprint(eigenvalues)

In [None]:
pcs.show(5, width=100)

Now that we’ve got principal components per sample, we may as well plot them! Human history exerts a strong effect in genetic datasets. Even with a 50MB sequencing dataset, we can recover the major human populations.

In [None]:
mt = mt.annotate_cols(scores = pcs[mt.s].scores)

In [None]:
p = hl.plot.scatter(mt.scores[0],
                    mt.scores[1],
                    label=mt.pheno.SuperPopulation,
                    title='PCA', xlabel='PC1', ylabel='PC2')
show(p)

Now we can rerun our linear regression, controlling for sample sex and the first few principal components. We’ll do this with input variable the number of alternate alleles as before, and again with input variable the genotype dosage derived from the PL field.

In [None]:
gwas = hl.linear_regression_rows(
    y=mt.pheno.CaffeineConsumption,
    x=mt.GT.n_alt_alleles(),
    covariates=[1.0, mt.pheno.isFemale, mt.scores[0], mt.scores[1], mt.scores[2]])


In [None]:
gwas.show()

We’ll first make a Q-Q plot to assess inflation…

In [None]:
p = hl.plot.qq(gwas.p_value)
show(p)

That’s more like it! This shape is indicative of a well-controlled (but not especially well-powered) study. And now for the Manhattan plot:

In [None]:
p = hl.plot.manhattan(gwas.p_value)
show(p)

We have found a caffeine consumption locus!

## Rare variant analysis
Here we’ll demonstrate how one can use the expression language to group and count by any arbitrary properties in row and column fields. Hail also implements the sequence kernel association test (SKAT).

In [None]:
entries = mt.entries()
results = (entries.group_by(pop = entries.pheno.SuperPopulation, chromosome = entries.locus.contig)
      .aggregate(n_het = hl.agg.count_where(entries.GT.is_het())))

results.show()

We use the **MatrixTable.entries** method to convert our matrix table to a table (with one row for each sample for each variant). In this representation, it is easy to aggregate over any fields we like, which is often the first step of rare variant analysis.

What if we want to group by minor allele frequency bin and hair color, and calculate the mean GQ?

In [None]:
entries = entries.annotate(maf_bin = hl.if_else(entries.info.AF[0]<0.01, "< 1%",
                             hl.if_else(entries.info.AF[0]<0.05, "1%-5%", ">5%")))

results2 = (entries.group_by(af_bin = entries.maf_bin, purple_hair = entries.pheno.PurpleHair)
      .aggregate(mean_gq = hl.agg.stats(entries.GQ).mean,
                 mean_dp = hl.agg.stats(entries.DP).mean))

results2.show()

We’ve shown that it’s easy to aggregate by a couple of arbitrary statistics. This specific examples may not provide especially useful pieces of information, but this same pattern can be used to detect effects of rare variation:

Count the number of heterozygous genotypes per gene by functional category (synonymous, missense, or loss-of-function) to estimate per-gene functional constraint

Count the number of singleton loss-of-function mutations per gene in cases and controls to detect genes involved in disease