Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
* Modified `filter_mt_to_trios` to no longer filter to autosomes as this should be handled during the variant QC pipeline [(#223)](https://github.com/broadinstitute/gnomad_methods/pull/223)
* Updated `annotate_sex` to add globals to `sex_ht` [(#227)](https://github.com/broadinstitute/gnomad_methods/pull/227)
* Document `slack_notifications` function [(#228)](https://github.com/broadinstitute/gnomad_methods/pull/228)
* Added `median_impute_features` to variant QC random forest module [(224)](https://github.com/broadinstitute/gnomad_methods/pull/224)
* Created `training.py` in variant QC and added `sample_training_examples` [(224)](https://github.com/broadinstitute/gnomad_methods/pull/224)
* Added variant QC pipeline function `train_rf_model` [(224)](https://github.com/broadinstitute/gnomad_methods/pull/224)

## Version 0.3.0 - April 28th, 2020

Expand Down
94 changes: 93 additions & 1 deletion gnomad/variant_qc/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import logging
from typing import Dict, Optional
from typing import Dict, List, Optional, Tuple, Union

import hail as hl
import pyspark.sql

import hail as hl

Expand All @@ -14,6 +17,12 @@
from gnomad.utils.filtering import filter_to_autosomes
from gnomad.utils.reference_genome import get_reference_genome
from gnomad.variant_qc.evaluation import compute_quantile_bin
from gnomad.variant_qc.random_forest import (
get_features_importance,
test_model,
train_rf,
)
from gnomad.variant_qc.training import sample_training_examples

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -339,3 +348,86 @@ def generate_sib_stats(
).rows()

return sib_stats_ht


def train_rf_model(
ht: hl.Table,
rf_features: List[str],
tp_expr: hl.expr.BooleanExpression,
fp_expr: hl.expr.BooleanExpression,
fp_to_tp: float = 1.0,
num_trees: int = 500,
max_depth: int = 5,
test_expr: hl.expr.BooleanExpression = False,
) -> Tuple[hl.Table, pyspark.ml.PipelineModel]:
"""
Perform random forest (RF) training using a Table annotated with features and training data.

.. note::

This function uses `train_rf` and extends it by:
- Adding an option to apply the resulting model to test variants which are withheld from training.
- Uses a false positive (FP) to true positive (TP) ratio to determine what variants to use for RF training.

The returned Table includes the following annotations:
- rf_train: indicates if the variant was used for training of the RF model.
- rf_label: indicates if the variant is a TP or FP.
- rf_test: indicates if the variant was used in testing of the RF model.
- features: global annotation of the features used for the RF model.
- features_importance: global annotation of the importance of each feature in the model.
- test_results: results from testing the model on variants defined by `test_expr`.

:param ht: Table annotated with features for the RF model and the positive and negative training data.
:param rf_features: List of column names to use as features in the RF training.
:param tp_expr: TP training expression.
:param fp_expr: FP training expression.
:param fp_to_tp: Ratio of FPs to TPs for creating the RF model. If set to 0, all training examples are used.
:param num_trees: Number of trees in the RF model.
:param max_depth: Maxmimum tree depth in the RF model.
:param test_expr: An expression specifying variants to hold out for testing and use for evaluation only.
:return: Table with TP and FP training sets used in the RF training and the resulting RF model.
"""

ht = ht.annotate(_tp=tp_expr, _fp=fp_expr, rf_test=test_expr)

rf_ht = sample_training_examples(
ht, tp_expr=ht._tp, fp_expr=ht._fp, fp_to_tp=fp_to_tp, test_expr=ht.rf_test
)
ht = ht.annotate(rf_train=rf_ht[ht.key].train, rf_label=rf_ht[ht.key].label)

summary = ht.group_by("_tp", "_fp", "rf_train", "rf_label", "rf_test").aggregate(
n=hl.agg.count()
)
logger.info("Summary of TP/FP and RF training data:")
summary.show(n=20)

logger.info(
"Training RF model:\nfeatures: {}\nnum_tree: {}\nmax_depth:{}".format(
",".join(rf_features), num_trees, max_depth
)
)

rf_model = train_rf(
ht.filter(ht.rf_train),
features=rf_features,
label="rf_label",
num_trees=num_trees,
max_depth=max_depth,
)

test_results = None
if test_expr is not None:
logger.info(f"Testing model on specified variants or intervals...")
test_ht = ht.filter(hl.is_defined(ht.rf_label) & ht.rf_test)
test_results = test_model(
test_ht, rf_model, features=rf_features, label="rf_label"
)

features_importance = get_features_importance(rf_model)
ht = ht.select_globals(
features_importance=features_importance,
features=rf_features,
test_results=test_results,
)

return ht.select("rf_train", "rf_label", "rf_test"), rf_model
69 changes: 68 additions & 1 deletion gnomad/variant_qc/random_forest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import pprint
from pprint import pformat
from typing import Dict, List, Tuple
from typing import Dict, List, Optional, Tuple

import hail as hl
import pandas as pd
Expand Down Expand Up @@ -159,6 +159,73 @@ def get_columns_quantiles(
return res


def median_impute_features(
ht: hl.Table, strata: Optional[Dict[str, hl.expr.Expression]] = None
) -> hl.Table:
"""
Numerical features in the Table are median-imputed by Hail's `approx_median`.

If a `strata` dict is given, imputation is done based on the median of of each stratification.

The annotations that are added to the Table are
- feature_imputed - A row annotation indicating if each numerical feature was imputed or not.
- features_median - A global annotation containing the median of the numerical features. If `strata` is given,
this struct will also be broken down by the given strata.
- variants_by_strata - An additional global annotation with the variant counts by strata that will only be
added if imputing by a given `strata`.

:param ht: Table containing all samples and features for median imputation.
:param strata: Whether to impute features median by specific strata (default False).
:return: Feature Table imputed using approximate median values.
"""

logger.info("Computing feature medians for imputation of missing numeric values")
numerical_features = [
k for k, v in ht.row.dtype.items() if v == hl.tint or v == hl.tfloat
]

median_agg_expr = hl.struct(
**{feature: hl.agg.approx_median(ht[feature]) for feature in numerical_features}
)

if strata:
ht = ht.annotate_globals(
feature_medians=ht.aggregate(
hl.agg.group_by(hl.tuple([ht[x] for x in strata]), median_agg_expr),
_localize=False,
),
variants_by_strata=ht.aggregate(
hl.agg.counter(hl.tuple([ht[x] for x in strata])), _localize=False
),
)
feature_median_expr = ht.feature_medians[hl.tuple([ht[x] for x in strata])]
logger.info(
"Variant count by strata:\n{}".format(
"\n".join(
[
"{}: {}".format(k, v)
for k, v in hl.eval(ht.variants_by_strata).items()
]
)
)
)

else:
ht = ht.annotate_globals(
feature_medians=ht.aggregate(median_agg_expr, _localize=False)
)
feature_median_expr = ht.feature_medians

ht = ht.annotate(
**{f: hl.or_else(ht[f], feature_median_expr[f]) for f in numerical_features},
feature_imputed=hl.struct(
**{f: hl.is_missing(ht[f]) for f in numerical_features}
),
)

return ht


def ht_to_rf_df(
ht: hl.Table, features: List[str], label: str, index: str = None
) -> pyspark.sql.DataFrame:
Expand Down
118 changes: 118 additions & 0 deletions gnomad/variant_qc/training.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import logging
from typing import List, Optional, Tuple

import hail as hl
from pprint import pformat

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def sample_training_examples(
ht: hl.Table,
tp_expr: hl.BooleanExpression,
fp_expr: hl.BooleanExpression,
fp_to_tp: float = 1.0,
test_expr: Optional[hl.expr.BooleanExpression] = None,
) -> hl.Table:
"""
Returns a Table of all positive and negative training examples in `ht` with an annotation indicating those that
should be used for training given a true positive (TP) to false positive (FP) ratio.

The returned Table has the following annotations:
- train: indicates if the variant should be used for training. A row is given False for the annotation if True
for `test_expr`, True for both `tp_expr and fp_expr`, or it is pruned out to obtain the desired `fp_to_tp` ratio.
- label: indicates if a variant is a 'TP' or 'FP' and will also be labeled as such for variants defined by `test_expr`.

.. note::

- This function does not support multi-allelic variants.
- The function will give some stats about the TPs/FPs provided (Ti, Tv, indels).

:param ht: Input Table.
:param tp_expr: Expression for TP examples.
:param fp_expr: Expression for FP examples.
:param fp_to_tp: FP to TP ratio. If set to <= 0, all training examples are used.
:param test_expr: Optional expression to exclude a set of variants from training set. Still contains TP/FP label annotation.
:return: Table subset with corresponding TP and FP examples with desired FP to TP ratio.
"""

ht = ht.select(
_tp=hl.or_else(tp_expr, False),
_fp=hl.or_else(fp_expr, False),
_exclude=False if test_expr is None else test_expr,
)
ht = ht.filter(ht._tp | ht._fp).persist()

# Get stats about TP / FP sets
def _get_train_counts(ht: hl.Table) -> Tuple[int, int]:
"""
Determine the number of TP and FP variants in the input Table and report some stats on Ti, Tv, indels.

:param ht: Input Table
:return: Counts of TP and FP variants in the table
"""
train_stats = hl.struct(n=hl.agg.count())

if "alleles" in ht.row and ht.row.alleles.dtype == hl.tarray(hl.tstr):
train_stats = train_stats.annotate(
ti=hl.agg.count_where(
hl.expr.is_transition(ht.alleles[0], ht.alleles[1])
),
tv=hl.agg.count_where(
hl.expr.is_transversion(ht.alleles[0], ht.alleles[1])
),
indel=hl.agg.count_where(
hl.expr.is_indel(ht.alleles[0], ht.alleles[1])
),
)

# Sample training examples
pd_stats = (
ht.group_by(**{"contig": ht.locus.contig, "tp": ht._tp, "fp": ht._fp})
.aggregate(**train_stats)
.to_pandas()
)

logger.info(pformat(pd_stats))
pd_stats = pd_stats.fillna(False)

# Number of true positive and false positive variants to be sampled for the training set
n_tp = pd_stats[pd_stats["tp"] & ~pd_stats["fp"]]["n"].sum()
n_fp = pd_stats[~pd_stats["tp"] & pd_stats["fp"]]["n"].sum()

return n_tp, n_fp

n_tp, n_fp = _get_train_counts(ht.filter(~ht._exclude))

prob_tp = prob_fp = 1.0
if fp_to_tp > 0:
desired_fp = fp_to_tp * n_tp
if desired_fp < n_fp:
prob_fp = desired_fp / n_fp
else:
prob_tp = n_fp / desired_fp

logger.info(
f"Training examples sampling: tp={prob_tp}*{n_tp}, fp={prob_fp}*{n_fp}"
)

train_expr = (
hl.case(missing_false=True)
.when(ht._fp & hl.or_else(~ht._tp, True), hl.rand_bool(prob_fp))
.when(ht._tp & hl.or_else(~ht._fp, True), hl.rand_bool(prob_tp))
.default(False)
)
else:
train_expr = ~(ht._tp & ht._fp)
logger.info(f"Using all {n_tp} TP and {n_fp} FP training examples.")

label_expr = (
hl.case(missing_false=True)
.when(ht._tp & hl.or_else(~ht._fp, True), "TP")
.when(ht._fp & hl.or_else(~ht._tp, True), "FP")
.default(hl.null(hl.tstr))
)

return ht.select(train=train_expr & ~ht._exclude, label=label_expr)