# Merge variants

In this notebook, we merge the AoU and UKB variants.

# Setup 

In [None]:
from datetime import datetime
import hail as hl
import os
import time

## Define constants

<div class="alert alert-block alert-info">
<b>Note:</b> The AoU matrix table for the alpha2 release was created via notebook 'create_matrix_tables.ipynb'. It contains all samples and variants for the alpha2 release.
</div>

In [None]:
AOU_MT = f'{os.getenv("WORKSPACE_BUCKET")}/data/aou/alpha2/cohort.mt'

<div class="alert alert-block alert-info">
<b>Note:</b> The UKB matrix table was created via notebook 'create_matrix_tables' and then repartitioned via notebook 'redo_partitions'.
</div>

In [None]:
UKB_MT = f'{os.getenv("WORKSPACE_BUCKET")}/data/ukb/exomes/full_dataset_fewer_partitions.mt'

In [None]:
# Partition the two matrix tables with the same NUM_MT_READ_PARTITIONS partitions from the UKB matrix table.
NUM_MT_READ_PARTITIONS = 7500

In [None]:
EXOME_REGIONS = f'{os.getenv("WORKSPACE_BUCKET")}/data/ukb/exomes/xgen_plus_spikein.GRCh38.bed'

In [None]:
INTERVALS_TO_EXAMINE = [f'chr{chrom}' for chrom in range(6, 21)]
INTERVALS_TO_EXAMINE_NAME = '_'.join(INTERVALS_TO_EXAMINE).replace(':', 'range')

In [None]:
RESULT_BUCKET = os.getenv("WORKSPACE_BUCKET")
DATESTAMP = time.strftime('%Y%m%d')
TIMESTAMP = time.strftime('%Y%m%d_%H%M%S')
WORK_DIR = !pwd

# Outputs
AOU_ROWS_TAB = f'{os.getenv("WORKSPACE_BUCKET")}/data/merged/{DATESTAMP}/aou_rows-{INTERVALS_TO_EXAMINE_NAME}.tab'
UKB_ROWS_TAB = f'{os.getenv("WORKSPACE_BUCKET")}/data/merged/{DATESTAMP}/ukb_rows-{INTERVALS_TO_EXAMINE_NAME}.tab'
MERGED_MT = f'{os.getenv("WORKSPACE_BUCKET")}/data/merged/{DATESTAMP}/merged-filtered-{INTERVALS_TO_EXAMINE_NAME}.mt'
AOU_ONLY_TAB = f'{os.getenv("WORKSPACE_BUCKET")}/data/merged/{DATESTAMP}/aou_only-filtered-{INTERVALS_TO_EXAMINE_NAME}.tab'
UKB_ONLY_TAB = f'{os.getenv("WORKSPACE_BUCKET")}/data/merged/{DATESTAMP}/ukb_only-filtered-{INTERVALS_TO_EXAMINE_NAME}.tab'
HAIL_LOG = f'{WORK_DIR[0]}/hail-merge-variants-{TIMESTAMP}.log'
HAIL_LOG_DIR_FOR_PROVENANCE = f'{os.getenv("WORKSPACE_BUCKET")}/hail-logs/{DATESTAMP}/'

In [None]:
print(MERGED_MT)
print(AOU_ONLY_TAB)
print(UKB_ONLY_TAB)
print(HAIL_LOG)

## Check access

In [None]:
!gsutil ls {AOU_MT}

In [None]:
!gsutil ls {UKB_MT}

## Start Hail 

In [None]:
# See also https://towardsdatascience.com/fetch-failed-exception-in-apache-spark-decrypting-the-most-common-causes-b8dff21075c
# See https://spark.apache.org/docs/2.4.7/configuration.html

EXTRA_SPARK_CONFIG = {
    # If set to "true", performs speculative execution of tasks. This means if one or more tasks are running
    # slowly in a stage, they will be re-launched.
    'spark.speculation': 'true', # Default is false.
    
    # Fraction of tasks which must be complete before speculation is enabled for a particular stage.
    'spark.speculation.quantile': '0.95', # Default is 0.75

    # Default timeout for all network interactions. This config will be used in place of 
    # spark.core.connection.ack.wait.timeout, spark.storage.blockManagerSlaveTimeoutMs, 
    # spark.shuffle.io.connectionTimeout, spark.rpc.askTimeout or spark.rpc.lookupTimeout if they are not configured.
    'spark.network.timeout': '180s', # Default is 120s
        
    # (Netty only) Fetches that fail due to IO-related exceptions are automatically retried if this is set to a
    # non-zero value. This retry logic helps stabilize large shuffles in the face of long GC pauses or transient
    # network connectivity issues.
    'spark.shuffle.io.maxRetries': '10',  # Default is 3
    
    # (Netty only) How long to wait between retries of fetches. The maximum delay caused by retrying is 15 seconds
    # by default, calculated as maxRetries * retryWait.
    'spark.shuffle.io.retryWait': '15s',  # Default is 5s
    
    # Number of failures of any particular task before giving up on the job. The total number of failures spread
    # across different tasks will not cause the job to fail; a particular task has to fail this number of attempts.
    # Should be greater than or equal to 1. Number of allowed retries = this value - 1.
    'spark.task.maxFailures': '10', # Default is 4.

    # Number of consecutive stage attempts allowed before a stage is aborted.
    'spark.stage.maxConsecutiveAttempts': '10' # Default is 4.
}

In [None]:
hl.init(spark_conf=EXTRA_SPARK_CONFIG,
        min_block_size=50,
        default_reference='GRCh38',
        log=HAIL_LOG)

Check the configuration.

In [None]:
sc = hl.spark_context()
config = sc._conf.getAll()
config.sort()
config

# Load exome capture regions

In [None]:
ukb_exome_capture_regions = hl.import_bed(EXOME_REGIONS)

In [None]:
ukb_exome_capture_regions.describe()

In [None]:
ukb_exome_capture_regions.aggregate(hl.agg.counter(ukb_exome_capture_regions.interval.start.contig))

In [None]:
ukb_exome_capture_regions.show(5)

# Read UKB exomes matrix table

In [None]:
ukb_exomes = hl.read_matrix_table(UKB_MT)

In [None]:
ukb_exomes.n_partitions()

In [None]:
# https://discuss.hail.is/t/improving-pipeline-performance/1344
new_partitions = ukb_exomes._calculate_new_partitions(NUM_MT_READ_PARTITIONS)

In [None]:
ukb_exomes = hl.read_matrix_table(UKB_MT, _intervals=new_partitions)

In [None]:
ukb_exomes.describe()

## Filter to include only our genomic intervals of interest

In [None]:
ukb_exomes = hl.filter_intervals(
    ukb_exomes,
    [hl.parse_locus_interval(x) for x in INTERVALS_TO_EXAMINE],
    keep=True)

## Filter to include only exonic variants

In [None]:
# Make the shuffle happen all at once by materializing the relevant row keys to a file.
relevant_ukb_rows = ukb_exomes.filter_rows(
    hl.is_defined(ukb_exome_capture_regions[ukb_exomes.locus])).rows().select().checkpoint(UKB_ROWS_TAB)
ukb_exomes = ukb_exomes.semi_join_rows(relevant_ukb_rows) # alias for filter_rows(hl.is_defined ... )

# Read AoU matrix table

In [None]:
aou_wgs = hl.read_matrix_table(AOU_MT, _intervals=new_partitions)

In [None]:
aou_wgs.n_partitions()

In [None]:
aou_wgs.describe()

## Filter to include only our genomic intervals of interest

In [None]:
aou_wgs = hl.filter_intervals(
    aou_wgs,
    [hl.parse_locus_interval(x) for x in INTERVALS_TO_EXAMINE],
    keep=True)

## Filter to include only exonic variants

In [None]:
# Make the shuffle happen all at once by materializing the relevant row keys to a file.
relevant_aou_rows = aou_wgs.filter_rows(
    hl.is_defined(ukb_exome_capture_regions[aou_wgs.locus])).rows().select().checkpoint(AOU_ROWS_TAB)
aou_wgs = aou_wgs.semi_join_rows(relevant_aou_rows) # alias for filter_rows(hl.is_defined ... )

# Omit samples that fail QC thresholds

TODO

# Omit variants that fail QC thresholds

TODO: more here

## Omit variants with filter flags

In [None]:
aou_wgs = aou_wgs.filter_rows(hl.len(aou_wgs.filters) == 0)

In [None]:
ukb_exomes = ukb_exomes.filter_rows(hl.is_missing(ukb_exomes.filters))

# Perform the merge

In [None]:
start_all = datetime.now()
print(start_all)

## Split the multi-allelic sites

See also https://hail.is/docs/0.2/methods/genetics.html#hail.methods.split_multi_hts

In [None]:
# For efficiency, do not pass the biallelic variants to the split method,
# just add the corresponding annotations.
aou_bi = aou_wgs.filter_rows(hl.len(aou_wgs.alleles) == 2)
aou_bi = aou_bi.annotate_rows(a_index = 1)
aou_bi = aou_bi.annotate_rows(was_split = False)

# Split the multi-allelic sites into biallelic sites.
aou_multi = aou_wgs.filter_rows(hl.len(aou_wgs.alleles) > 2)
aou_split = hl.split_multi_hts(aou_multi,
                               keep_star=False,
                               left_aligned=False,
                               vep_root='vep',
                               permit_shuffle=False)

# Union the two collections and include only the row and entry fields that are needed.
aou_prepared = aou_split.union_rows(aou_bi)
aou_prepared = aou_prepared.annotate_cols(cohort='AOU')
aou_prepared = aou_prepared.key_cols_by(aou_prepared.s, aou_prepared.cohort)
aou_prepared = aou_prepared.select_entries(aou_prepared.GT)
aou_prepared = aou_prepared.select_rows(aou_qual=aou_prepared.qual,
                                        aou_filters=aou_prepared.filters,
                                        aou_info=aou_prepared.info,
                                        aou_a_index = aou_prepared.a_index,
                                        aou_was_split=aou_prepared.was_split,
                                       )

aou_prepared.describe()

In [None]:
# For efficiency, do not pass the biallelic variants to the split method,
# just add the corresponding annotations.
ukb_bi = ukb_exomes.filter_rows(hl.len(ukb_exomes.alleles) == 2)
ukb_bi = ukb_bi.annotate_rows(a_index = 1)
ukb_bi = ukb_bi.annotate_rows(was_split = False)

# Split the multi-allelic sites into biallelic sites.
ukb_multi = ukb_exomes.filter_rows(hl.len(ukb_exomes.alleles) > 2)
ukb_split = hl.split_multi_hts(ukb_multi,
                               keep_star=False,
                               left_aligned=False,
                               vep_root='vep',
                               permit_shuffle=False)

# Union the two collections and include only the row and entry fields that are needed.
ukb_prepared = ukb_split.union_rows(ukb_bi)
ukb_prepared = ukb_prepared.annotate_cols(cohort='UKB')
ukb_prepared = ukb_prepared.key_cols_by(ukb_prepared.s, ukb_prepared.cohort)
ukb_prepared = ukb_prepared.select_entries(ukb_prepared.GT)
ukb_prepared = ukb_prepared.select_rows(ukb_qual=ukb_prepared.qual,
                                        ukb_filters=ukb_prepared.filters,
                                        ukb_info=ukb_prepared.info,
                                        ukb_a_index = ukb_prepared.a_index,
                                        ukb_was_split=ukb_prepared.was_split,
                                       )

ukb_prepared.describe()

## Compute the intersection

**Note** this will retain the row fields from AoU but not those from UKB. We could also add an annotation step to add those row UKB row fields to the intersection.

In [None]:
intersection = aou_prepared.union_cols(ukb_prepared)

In [None]:
start = datetime.now()
print(start)

In [None]:
intersection.write(MERGED_MT, overwrite=True)

In [None]:
end = datetime.now()
print(end)
print(end - start)

## Compute AoU - UKB

In [None]:
aou_only = aou_prepared.rows().anti_join(ukb_prepared.rows())

In [None]:
start = datetime.now()
print(start)

In [None]:
aou_only.write(AOU_ONLY_TAB, overwrite=True)

In [None]:
end = datetime.now()
print(end)
print(end - start)

## Compute UKB - AoU

In [None]:
ukb_only = ukb_prepared.rows().anti_join(aou_prepared.rows())

In [None]:
start = datetime.now()
print(start)

In [None]:
ukb_only.write(UKB_ONLY_TAB, overwrite=True)

In [None]:
end = datetime.now()
print(end)
print(end - start)

# Provenance

In [None]:
end_all = datetime.now()
print(end_all)
print(end_all - start_all)

In [None]:
# Copy the Hail log to the workspace bucket so that we can retain it.
!gzip --keep {HAIL_LOG}
!gsutil cp {HAIL_LOG}.gz {HAIL_LOG_DIR_FOR_PROVENANCE}

In [None]:
print(datetime.now())

In [None]:
!pip3 freeze