# 1. Importing pVCFs into Hail and running genotype QC

This script should be run on one chromosome at a time, so the most important part of this is defining the chromosome you're working with. 

Below is the instance type used for each chromosome and time taken to run this script. Often, the jupyterlab monitoring page on the RAP will say a limited amount of the compute power/instance has been used, but this isn't the case as jobs would be failing with less power than this. I think the Spark UI is a much more informative way to examine your scripts running, and you access this using this link: https://job-{jobid}.dnanexus.cloud:8081/jobs/ . You can get the job ID from the jupyternotebook instance you're running from, which will be https://job-{jobid}.dnanexus.cloud . The Spark UI wont work until you have initiated spark (run the first block of script below). 

For this stage, I generally used mem3_ssd1_v2_x8 instances, increasing the number of nodes as the number of partitions and size of the chromosome increased. Instances with 8 workers are recommended by Hail as these are what Hail is developed and tested on. The UKBB RAP allows a maximum of 100 nodes and therefore for larger chromosomes, I needed to increase to a mem3_ssd1_v2_x16 instance to enable the number of workers required for the job to be completed in a similar time to previous nodes (~2hours). Whilst the mem3 instance appears to be more memory than is typically required (it offers 62GB per worker, and the spark UI suggests the peak memory usage is just over 30GB per worker), the option below it (mem2, which offers 32GB per worker) often lead to jobs failing when I was testing this script, and was therefore not usable. 

Finally, for chromosomes 1 and 2, more memory per node than was offered by the mem3 instances was required. These chromosomes thus required the only mem4 option available (mem4_ssd1_x128) which was significantly more costly. I later had issues with chr1 being too large and therefore split it into two halves, so I would recommend doing this at this stage and using the mem3 instance instead to decrease processing costs.

Numbers of variants, instances, nodes, times and costs are lister per chromosome below, but from my understanding of Hail time could have been reduced by increasing the number of nodes used. 
- Chromosome 1: ~2.28 million loci, using a mem4_ssd1_x128 instance with 12 nodes, this script took ~ hrs, costing £320.
- Chromosome 2: ~1.68 million loci, using a mem4_ssd1_x128 instance with 11 nodes, this script took ~ 3.5 hrs, costing £280.
- Chromosome 3: ~1.34 million loci, using a mem3_ssd1_v2_x16 instance with 85 nodes, this script took ~ 1.7 hrs, costing £85.
- Chromosome 4: ~920k loci, using a mem3_ssd1_v2_x8 instance with 100 nodes, this script took ~1.8 hrs, costing £53.
- Chromosome 5: ~1.02 million loci, using a mem3_ssd1_v2_x16 instance with 60 nodes, this script took ~ 1.8 hrs, costing £63.
- Chromosome 6: ~1.14 million loci, using a mem3_ssd1_v2_x16 instance with 60 nodes, this script took ~ 1.9 hrs, costing £67.
- Chromosome 7: ~1.09 million loci, using a mem3_ssd1_v2_x16 instance with 60 nodes, this script took ~ 1.8 hrs, costing £65.
- Chromosome 8: ~820k loci, using a mem3_ssd1_v2_x8 instance with 70 nodes, this script took ~2.3 hrs, costing £46.
- Chromosome 9: ~980k loci, using a mem3_ssd1_v2_x8 instance with 100 nodes, this script took ~2 hrs, costing £58.
- Chromosome 10: ~940k loci, using a mem3_ssd1_v2_x8 instance with 100 nodes, this script took ~1.9 hrs, costing £55.
- Chromosome 11: ~1.34 million loci, using a mem3_ssd1_v2_x16 instance with 85 nodes, this script took ~1.8 hrs, costing £89.
- Chromosome 12: ~1.22 million loci, using a mem3_ssd1_v2_x16 instance with 80 nodes, this script took ~1.8 hrs, costing £72.
- Chromosome 13: ~410k loci, using a mem3_ssd1_v2_x8 instance with 50 nodes, this script took ~1.6 hrs, costing £24.
- Chromosome 14: ~710k loci, using a mem3_ssd1_v2_x8 instance with 70 nodes, this script took ~2 hrs, costing £40.
- Chromosome 15: ~790k loci, using a mem3_ssd1_v2_x8 instance with 70 nodes, this script took ~2.2 hrs, costing £44.
- Chromosome 16: ~1.07 million loci, using a mem3_ssd1_v2_x16 instance with 80 nodes, this script took ~1.6 hrs, costing £68.
- Chromosome 17: ~1.32 million loci, using a mem3_ssd1_v2_x16 instance with 80 nodes, this script took ~2.1 hrs, costing £84.
- Chromosome 18: ~370k loci, using a mem3_ssd1_v2_x8 instance with 20 nodes, this script took ~ 2.8 hrs, costing £15.
- Chromosome 19: ~1.49 million loci, using a mem3_ssd1_v2_x16 instance with 85 nodes, this script took ~1.9 hrs, costing £95.
- Chromosome 20: ~580k loci, using a mem3_ssd1_v2_x8 instance with 50 nodes, this script took ~2.1hrs, costing £31.
- Chromosome 21: ~240k loci, using a mem3_ssd1_v2_x8 instance with 20 nodes, this script took ~2.4hrs, costing £14.
- Chromosome 22: ~520k loci, using a mem3_ssd1_v2_x8 instance with 50 nodes, this script took ~2 hrs, costing £29.
- Chromosome 23(x): ~570k loci, using a mem3_ssd1_v2_x8 instance with 70 nodes, this script took ~ 1.5hrs, costing £30. 
- Chromosome 24(y): ~10k loci, using a mem2_ssd1_v2_x8 instance with 2 nodes, this script took < 1 hour, costing £0.45. 

Matrix tables are saved out using DNAX. The matrix tables formed from this stage total 450GB. 


## Set up environment
Make sure you run this block only once. You'll get errors if you try to initialise Hail multiple times. If you do do this, you'll need to restart the kernel, and then initialise Hail only once. 


In [1]:
# Initialise hail and spark logs? Running this cell will output a red-colored message- this is expected.
# The 'Welcome to Hail' message in the output will indicate that Hail is ready to use in the notebook.
import pyspark.sql

config = pyspark.SparkConf().setAll([('spark.kryoserializer.buffer.max', '128')]) #This resolves an error which was initially coming up due to exceeding the allowable buffer limit size. 
sc = pyspark.SparkContext(conf=config) 

from pyspark.sql import SparkSession

import hail as hl
builder = (
    SparkSession
    .builder
    .enableHiveSupport()
)
spark = builder.getOrCreate()
hl.init(sc=sc)

import dxpy

pip-installed Hail requires additional configuration options in Spark referring
  to the path to the Hail Python module directory HAIL_DIR,
  e.g. /path/to/python/site-packages/hail:
    spark.jars=HAIL_DIR/backend/hail-all-spark.jar
    spark.driver.extraClassPath=HAIL_DIR/backend/hail-all-spark.jar
    spark.executor.extraClassPath=./hail-all-spark.jarRunning on Apache Spark version 3.2.3
SparkUI available at http://ip-10-60-8-105.eu-west-2.compute.internal:8081
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.116-cd64e0876c94
LOGGING: writing to /opt/notebooks/hail-20240318-1332-0.2.116-cd64e0876c94.log


## Building matrix tables 

This section processes the pVCFs (the initial format of the UKBB WES data) and builds matrix tables (one per chromosome) from them. 

Here, locate WES data and import all data on one chromosome into a Hail matrix table, then write this table out to HFDS and read it back in to speed up later stages. 

### Define your variables

In [2]:
# Define the chromosome you're working with 
chr = 1

# Define variables used in import
file_url = f"file:///mnt/project/Bulk/Exome sequences/Population level exome OQFE variants, pVCF format - final release/*_c{chr}_*.vcf.gz"
# For x and y the file uses the letter rather than the number, so use below line. 
#file_url = f"file:///mnt/project/Bulk/Exome sequences/Population level exome OQFE variants, pVCF format - final release/*_cY_*.vcf.gz"
#file_url = f"file:///mnt/project/Bulk/Exome sequences/Population level exome OQFE variants, pVCF format - final release/*_cX_*.vcf.gz"

### Import WES data into a matrix table
Here you want to read in the pVCFs and save them out as a matrix table, which can then be read back into hail. Saving ou the matrix table before doing any work with it helps to speed up later processes in this script. Importantly, this matrix table is saved into HFDS storage, which is storage in your compute but not saved up to your UKBB RAP project. This is useful as the initial form of this mt is large and is not required in later stages, so does not need to be copied across to your project. 

In [3]:
a=hl.import_vcf(file_url, 
                 force_bgz=True,
                 reference_genome='GRCh38',
                 array_elements_required=False).write(f"./chr_{chr}_initial_mt.mt", overwrite=True)

2024-03-18 13:33:44.749 Hail: INFO: scanning VCF for sortedness...
2024-03-18 14:03:33.057 Hail: INFO: Coerced sorted VCF - no additional import work to do
2024-03-18 15:11:49.564 Hail: INFO: wrote matrix table with 2283839 rows and 469835 columns in 18190 partitions to ./chr_1_initial_mt.mt


Above, the initial matrix table is saved out only into HFDS storage, but not across to your project. 

In [4]:
mt=hl.read_matrix_table(f"./chr_{chr}_initial_mt.mt")
print(f"Num partitions: {mt.n_partitions()}")
# Then check if it looks as you'd expect
mt.describe()

Num partitions: 18190
----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
----------------------------------------
Row fields:
    'locus': locus<GRCh38>
    'alleles': array<str>
    'rsid': str
    'qual': float64
    'filters': set<str>
    'info': struct {
        AF: array<float64>, 
        AQ: array<int32>, 
        AC: array<int32>, 
        AN: int32
    }
----------------------------------------
Entry fields:
    'GT': call
    'RNC': array<str>
    'DP': int32
    'AD': array<int32>
    'GQ': int32
    'PL': array<int32>
----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
----------------------------------------


## Genotype quality control
Quality control (QC) is then run on these matrix tables. The resulting matrix table is then filtered for only genotype entries (to save space) and then saved out. 

### Initial QC
Some early genotype QC can be run before the initial save of the matrix tables so you don't need to save such a large file. Note that if you do this, you may need more memory/greater computing power and it may take longer than the predicted stages from the table. 

#### Early QC and splitting multi-allelic sites
Firstly, sites labelled in the pVCFs by GLNexus as ‘mono-allelic’ were removed. These are defined as sites representing an alternate allele region with multiple variants that ‘could not be unified into non-overlapping multi-allelic sites’. These typically make up < 1% of sites.

In [5]:
# First check initial variant counts
print(f'n original variants in chromosome {chr}: {mt.count_rows()}')

# Remove sites labelled as 'monoallelic'
mt=mt.filter_rows(hl.is_missing(mt.filters))
print(f'n variants in chromosome {chr} following removal of those labelled as monoallelic: {mt.count_rows()}')

# Remove sites with > 6 alleles 
mt = mt.filter_rows(mt.alleles.length() <= 6)
print(f'n variants in chromosome {chr} following removal of those with more than 6 alleles: {mt.count_rows()}')

# Split multi-allelic sites 
mt = hl.split_multi_hts(mt)
print(f'n variants in chromosome {chr} following splitting of multiallelic sites: {mt.count_rows()}')
print('Early variant QC complete')

n original variants in chromosome 1: 2283839
n variants in chromosome 1 following removal of those labelled as monoallelic: 2259918
n variants in chromosome 1 following removal of those with more than 6 alleles: 2256227
n variants in chromosome 1 following splitting of multiallelic sites: 2631192
Early variant QC complete


### Genotype QC

Here, you can run genotype-level quality control to filter the low quality genotypes out. Saving after filtering on genotype and removing additional genotype information means the table you save out is significantly smaller than if you save out before this point. 


Here, to remove genotypes based on the following criteria: 

- Unusual allele balance. Removed genotypes listed as: homozygous reference with > 10% alternate allele reads; homozygous alternate with > 10% reference allele reads; or heterozygous without a reference alternate allele balance of around 1:1 (remove genotypes with alternate allele proportion of < 25% or > 75%).
- Depth of < 10 
- Genotype quality of < 30


Subsequently, sites with no variants remaining are removed. Entries within the matrix table are then filtered to keep only the genotype (removing metrics such as depth and genotype quality), making the size of the matrix tables more computationally manageable. 

In [6]:
# Filtering on allele balance
## Set the allele balance variable 
# Create allele balance variable
ab=mt.AD[1]/hl.sum(mt.AD)
# Below removing: genotypes listed as homozygous reference but with >10 alternate allele reads | genotypes listed as homozgyous without a reference:alternate of ~1:1 | and genotypes listed as homozygous alternate with over 10% reference reads
filter_condition_ab=((mt.GT.is_hom_ref() & (ab <= 0.1)) | (mt.GT.is_het() & (ab >= 0.25) & (ab <= 0.75)) | (mt.GT.is_hom_var() & (ab >= 0.9)))
mt=mt.filter_entries(filter_condition_ab)
print('Filtering on allele balance completed')
# Remove entries with depth < 10
mt=mt.filter_entries(mt.DP > 10)
print('Filtering on depth completed')
# Remove entries with a GQ < 30
mt=mt.filter_entries(mt.GQ > 30)
print('Filtering on genotype quality completed')

Filtering on allele balance completed
Filtering on depth completed
Filtering on genotype quality completed


In [7]:
mt = hl.variant_qc(mt)
# The next stage (filtering out non-variant sites) runs as a very slow single stage if the mt is not saved out first
# So best to save out at this point in parallel, enabling later stages to run in parallel too.
mt.write(f"./chr_{chr}_postgenoQC_mt.mt", overwrite=True)

2024-03-18 15:30:25.887 Hail: INFO: Ordering unsorted dataset with network shuffle
2024-03-18 16:30:01.538 Hail: INFO: wrote matrix table with 2631192 rows and 469835 columns in 36380 partitions to ./chr_1_postgenoQC_mt.mt


When saving out second mt above, storage used roughly doubles. 

In [8]:
mt=hl.read_matrix_table(f"./chr_{chr}_postgenoQC_mt.mt")
# Remove variants with no alleles left following genotype QC 
# When run before being saved as an mt first, this was a v slow single stage, but now runs in parallel.  
mt = mt.filter_rows((mt.variant_qc.AF[0] == 0.0) | (mt.variant_qc.AF[0] == 1.0), keep = False)
print(f'n variants in chr {chr} following genotype QC and removal of non-variant rows: {mt.count_rows()}')
print('Genotype QC complete')

n variants in chr 1 following genotype QC and removal of non-variant rows: 2455972
Genotype QC complete


### Recalculate sample and variant QC metrics following genotype QC

These metrics should have improved following the removal of low quality genotypes.

Once these table have been read out, you should investigate them in R (as per scripts on git) to define filters you'd like to use for variant and sample level QC.

In [9]:
# Calculate variant QC 
mt=hl.variant_qc(mt)
mt.describe()

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
----------------------------------------
Row fields:
    'locus': locus<GRCh38>
    'alleles': array<str>
    'rsid': str
    'qual': float64
    'filters': set<str>
    'info': struct {
        AF: array<float64>, 
        AQ: array<int32>, 
        AC: array<int32>, 
        AN: int32
    }
    'a_index': int32
    'was_split': bool
    'variant_qc': struct {
        dp_stats: struct {
            mean: float64, 
            stdev: float64, 
            min: float64, 
            max: float64
        }, 
        gq_stats: struct {
            mean: float64, 
            stdev: float64, 
            min: float64, 
            max: float64
        }, 
        AC: array<int32>, 
        AF: array<float64>, 
        AN: int32, 
        homozygote_count: array<int32>, 
        call_rate: float64, 
        n_called: int64, 
        n_not_called: int64, 
    

In [10]:
# Calculate sample QC 
mt=hl.sample_qc(mt)
mt.describe()

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
    'sample_qc': struct {
        dp_stats: struct {
            mean: float64, 
            stdev: float64, 
            min: float64, 
            max: float64
        }, 
        gq_stats: struct {
            mean: float64, 
            stdev: float64, 
            min: float64, 
            max: float64
        }, 
        call_rate: float64, 
        n_called: int64, 
        n_not_called: int64, 
        n_filtered: int64, 
        n_hom_ref: int64, 
        n_het: int64, 
        n_hom_var: int64, 
        n_non_ref: int64, 
        n_singleton: int64, 
        n_snp: int64, 
        n_insertion: int64, 
        n_deletion: int64, 
        n_transition: int64, 
        n_transversion: int64, 
        n_star: int64, 
        r_ti_tv: float64, 
        r_het_hom_var: float64, 
        r_insertion_deletion: float64
    }
----------------------------

In [11]:
var_qc=mt.variant_qc
var_qc.export(f"chr_{chr}_post_geno_QC_var_qc.csv", delimiter=",")
print('Variant QC table written')

2024-03-18 16:37:01.858 Hail: INFO: merging 36381 files totalling 1.2G...
2024-03-18 16:38:05.538 Hail: INFO: while writing:
    chr_1_post_geno_QC_var_qc.csv
  merge time: 1m3.7s


Variant QC table written


In [12]:
qc_tb=mt.cols()
qc_tb.export(f"chr_{chr}_post_geno_QC_sample_qc.csv", delimiter=",")
print('Sample QC table written')

2024-03-18 16:39:21.500 Hail: WARN: cols(): Resulting column table is sorted by 'col_key'.
    To preserve matrix table column order, first unkey columns with 'key_cols_by()'
2024-03-18 16:53:21.209 Hail: INFO: Coerced sorted dataset
2024-03-18 16:53:22.975 Hail: INFO: merging 17 files totalling 250.5M...


Sample QC table written


2024-03-18 16:53:23.967 Hail: INFO: while writing:
    chr_1_post_geno_QC_sample_qc.csv
  merge time: 991.044ms


Copy the output files (which are currently in hdfs storage into your current environment, and then copy them up to your project on the RAP. 


In [13]:
%%bash
hdfs dfs -get ./*.csv ./

In [14]:
%%bash
# Upload these to that dir within your project
dx upload ./*.csv --destination ./sample_and_var_QC_metrics/ 

### Save out a matrix table here

At this stage, you can remove genotype data which is no longer required. Removing this data saves lots of space! Then save out a matrix table for use in future steps. 

As lots of information has been removed, the partitions are now larger than they need to be. Given that later stages of this process run across one partition at a time, having fewer partitions will speed up processing. The n partitions you want should be set manually, and I usually used around 1/3 as many partitions as there were when the mt was originally read in (reported in code box [4]). 
'shuffle=false' in the repartition command means new partitions will be formed just from the merging of previous partitions. This means some partitions will be uneven sizes, so not all tasks on partitions will run at the same time in the future. Remove this if you require even partitions, although this makes the repartitioning much more computationally demanding. 

In [15]:
# Drop all entries except the genotype
mt=mt.select_entries(mt.GT)
mt.describe()
mt=mt.repartition(6000, shuffle=False)

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
    'sample_qc': struct {
        dp_stats: struct {
            mean: float64, 
            stdev: float64, 
            min: float64, 
            max: float64
        }, 
        gq_stats: struct {
            mean: float64, 
            stdev: float64, 
            min: float64, 
            max: float64
        }, 
        call_rate: float64, 
        n_called: int64, 
        n_not_called: int64, 
        n_filtered: int64, 
        n_hom_ref: int64, 
        n_het: int64, 
        n_hom_var: int64, 
        n_non_ref: int64, 
        n_singleton: int64, 
        n_snp: int64, 
        n_insertion: int64, 
        n_deletion: int64, 
        n_transition: int64, 
        n_transversion: int64, 
        n_star: int64, 
        r_ti_tv: float64, 
        r_het_hom_var: float64, 
        r_insertion_deletion: float64
    }
----------------------------

In [16]:
# Save out this data table using DNAX 
# Define database and MT names
# Note: It is recommended to only use lowercase letters for the database name.
# If uppercase lettering is used, the database name will be lowercased when creating the database.
db_name = f"ukbb_test_wes_matrix_tables_feb23_test"
mt_name = f"chromosome_{chr}_post_genoqc_final.mt"

In [17]:
# Create database in DNAX
stmt = f"CREATE DATABASE IF NOT EXISTS {db_name} LOCATION 'dnax://'"
print(stmt)
spark.sql(stmt).show()

CREATE DATABASE IF NOT EXISTS ukbb_test_wes_matrix_tables_feb23_test LOCATION 'dnax://'
++
||
++
++



In [18]:
# Store MT in DNAX
import dxpy

# Find database ID of newly created database using dxpy method
db_uri = dxpy.find_one_data_object(name=f"{db_name}", classname="database")['id']
url = f"dnax://{db_uri}/{mt_name}" # Note: the dnax url must follow this format to properly save MT to DNAX

# Before this step, the Hail MatrixTable is just an object in memory. To persist it and be able to access 
# it later, the notebook needs to write it into a persistent filesystem (in this case DNAX).
mt.write(url) # Note: output should describe size of MT (i.e. number of rows, columns, partitions) 

2024-03-18 17:24:57.610 Hail: INFO: wrote matrix table with 2455972 rows and 469835 columns in 6000 partitions to dnax://database-GgPbpq8J637bkp84VQyQ83X9/chromosome_1_post_genoqc_final.mt


This saved out matrix table can then be read into JupyterLab for the next stages of processing. First, post genotype QC sample and variant level QC metrics should be read out of the mt, examined and cut-offs defined. Once these have been defined, they can be applied to the matrix table to leave only high quality samples and variants in the table. 

## Check the mt you've saved out
Check the mt you jsut wrote can be read back in, and ensure the count is as you'd expect. 

In [19]:
# Check your mt can be read in for later scripts
b=hl.read_matrix_table(f"dnax://database-GgPbpq8J637bkp84VQyQ83X9/chromosome_{chr}_post_genoqc_final.mt")
print(b.count())
b.describe()

(2455972, 469835)
----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
    'sample_qc': struct {
        dp_stats: struct {
            mean: float64, 
            stdev: float64, 
            min: float64, 
            max: float64
        }, 
        gq_stats: struct {
            mean: float64, 
            stdev: float64, 
            min: float64, 
            max: float64
        }, 
        call_rate: float64, 
        n_called: int64, 
        n_not_called: int64, 
        n_filtered: int64, 
        n_hom_ref: int64, 
        n_het: int64, 
        n_hom_var: int64, 
        n_non_ref: int64, 
        n_singleton: int64, 
        n_snp: int64, 
        n_insertion: int64, 
        n_deletion: int64, 
        n_transition: int64, 
        n_transversion: int64, 
        n_star: int64, 
        r_ti_tv: float64, 
        r_het_hom_var: float64, 
        r_insertion_deletion: float64
    }
----------