# Estimating Narrow-sense heritability

Narrow sense heritability, $h^2$ is the ratio of variance due to average/additive effects of alleles. Where $Var(A)$ is the variance due to this addative effect and $Var(P)$ is the total phenotypic variance:

$$h^2 = \frac{Var(A)}{Var(P)}$$

## Computing $h^2$

The total variance of the phenotype includes the variance from the additive genetic component $Var(A)$, as well as a components from environemntal and non-additive genetic effects which are collectively $Var(\neg A)$. 

$$Var(P) = Var(A) + Var(\neg A)$$

We will model the additive component of the phenotype $A$ by training a linear model that will predict a phenotype value given the input genotype. The model will learn one weight per variant (i.e. SNPs from both haplotypes will be combined into a single feature with a value of 0, 1, or 2). The coefficient of determination for this model is the narrow sense heritability $h^2$, but I'll derive that from the fraction of variance unexplained for clarity.

The fraction of variance unexplained (FVU) is the fraction of variance for a target variable in a regression not explained by the model. In this case, the target variable is the phenotype and the model is the linear model of the additive genetic component. For a linear regression, this is equivalent to $ 1 - R^2 $, where $R^2$ is the coefficient of determination for the linear regression model.

$$ FVU = \frac{Var(\neg A)}{Var(P)} = 1 - R^2 $$

From here we can find that $h^2 = R^2$.

$$ Var(A) = Var(P) (1 - FVU) = Var(P) [1 - (1 - R^2)] = Var(P) R^2 $$

$$ h^2 = \frac{Var(A)}{Var(P)} = \frac{Var(P) R^2}{Var(P)} = R^2 $$

The plan is to simulate $n_{train}$ phenotypes per genotype for training and then compute $R^2$ using $n_{test}$ different simulated phenotypes per genotype, but I'm not sure if it matters.

## Implement estimation method

In [1]:
import copy

from pheno_sim import PhenoSimulation

In [2]:
def gen_config(heritability=1.0, combine_type="AdditiveCombine"):
	"""Return linear additive model config with specified heritability
	and haplotype combine node type.
	"""

	return {
		"input": [
			{
				"file": "1000_genomes_data/ALL.chr19.phase3_shapeit2_mvncall_integrated_v5b.20130502.genotypes.vcf.gz",
				"file_format": "vcf",
				"reference_genome": "GRCh37",
				"force_bgz": True,
				"input_nodes": [
					{
						"alias": "rare_variants",
						"type": "SNP",
						"chr": "19",
						"pos": [13397324, 52901080, 16023111]
					},
					{
						"alias": "common_variants",
						"type": "SNP",
						"chr": "19",
						"pos": [55555845, 55261043, 45857820, 55442280, 39369369]
					}
				]
			}
		],
		"simulation_steps": [
			{
				"type": "RandomConstant",
				"alias": "rare_variants_betas",
				"input_match_size": "rare_variants",
				"dist_name": "normal",
				"dist_kwargs": {
					"loc": 0.0,
					"scale": 0.5
				},
				"by_feat": True
			},
			{
				"type": "RandomConstant",
				"alias": "common_variants_betas",
				"input_match_size": "common_variants",
				"dist_name": "normal",
				"dist_kwargs": {
					"loc": 0.0,
					"scale": 0.2
				},
				"by_feat": True
			},
			{
				"type": "Product",
				"alias": "rare_variant_effects",
				"input_aliases": [
					"rare_variants_betas", "rare_variants"
				]
			},
			{
				"type": "Product",
				"alias": "common_variant_effects",
				"input_aliases": [
					"common_variants_betas", "common_variants"
				]
			},
			{
				"type": "Concatenate",
				"alias": "variant_effects",
				"input_aliases": [
					"rare_variant_effects", "common_variant_effects"
				],
			},
			{
				"type": combine_type,
				"alias": "combined_variant_effects",
				"input_alias": "variant_effects"
			},
			{
				"type": "SumReduce",
				"alias": "no_noise_phenotype",
				"input_alias": "combined_variant_effects"
			},
			{
				"type": "Heritability",
				"alias": "phenotype",
				"input_alias": "no_noise_phenotype",
				"heritability": heritability
			}
		]
	}

In [3]:
# Get input data
sim = PhenoSimulation(gen_config())
input_vals = sim.run_input_step()

Initializing Hail with default parameters...


Loading input data...


SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J: Ignoring binding found at [jar:file:/Users/ross/miniforge3/envs/pheno_sim/lib/python3.10/site-packages/pyspark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Running on Apache Spark version 3.3.2
SparkUI available at http://rosss-mbp.lan:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.118-a4ca239602bb
LOGGING: writing to /Users/ross/Desktop/gwas/CITRUS/doc/example_nbs/hail-20230809-1305-0.2.118-a4ca239602bb.log
2023-08-09 

In [4]:
import numpy as np
import pandas as pd
from scipy import stats
from sklearn import linear_model
from sklearn import model_selection
from tqdm import tqdm, trange

In [5]:
def sample_vals_dict(vals_dict, n_samples=1.0):
	"""Subsample individuals in vals dict.
	
	If n_samples is less than 1, will use that fraction of samples. If
	n_samples is greater than 1, will use that number of samples. If 
	n_samples is 1 (default), will return the same vals dict.
	
	Args:
		vals (dict): Dictionary of values.
		n_samples (default 1.0): The fraction or number of samples to
			use when estimating heritability. If value is less than or
			equal to 1, will use that fraction of samples. If value is
			greater than 1, will use that number of samples. Samples
			will be randomly selected from the input samples.
	"""
	vals_dict = copy.deepcopy(vals_dict)
	
	# Get number of sample in input.
	input_item = vals_dict[list(vals_dict.keys())[0]]
	if isinstance(input_item, tuple):
		n_input_samples = input_item[0].shape[-1]
	else:
		n_input_samples = input_item.shape[-1]

	# Shuffle and possibly subsample input data
	if n_samples > 1:
		selected_idx = np.random.choice(
			n_input_samples,
			n_samples,
			replace=False
		)
		
		for key, val in vals_dict.items():
			if isinstance(val, tuple):
				vals_dict[key] = (
					val[0][..., selected_idx], val[1][..., selected_idx]
				)
			else:
				vals_dict[key] = val[..., selected_idx]
	elif n_samples < 1:
		selected_idx = np.random.choice(
			n_input_samples,
			int(n_samples * n_input_samples),
			replace=False
		)
		
		for key, val in vals_dict.items():
			if isinstance(val, tuple):
				vals_dict[key] = (
					val[0][..., selected_idx], val[1][..., selected_idx]
				)
			else:
				vals_dict[key] = val[..., selected_idx]

	return vals_dict

In [27]:
# Estimate_narrow_heritability method
sim = PhenoSimulation(gen_config(
	heritability=0.5,
	combine_type="AdditiveCombine"
))
input_vals = input_vals
n_samples = 0.1
n_replicates = 250
n_folds = 5
n_repeats = 5
statistic_fn = np.mean
confidence_level=0.95
phenotype_alias = 'phenotype'


# Simulate and then get CV scores n_repeats times
r2_scores = []

for i in trange(n_repeats):
	# Subsample individuals if n_samples is not 1
	iter_input_vals = sample_vals_dict(input_vals, n_samples)

	# Simulate phenotypes to create labels, tracking genotype
	pheno_vals = []
	geno_idx = []

	for i in range(n_replicates):
		gen_phenos = sim.run_simulation_steps(
			copy.deepcopy(iter_input_vals)
		)[phenotype_alias]

		pheno_vals.extend(gen_phenos)
		geno_idx.extend(list(range(len(gen_phenos))))

	# Convert input values to a dataframe and stack to match n_replicates
	input_df = sim.vals_dict_to_dataframe(iter_input_vals)
	input_df = pd.concat([input_df] * n_replicates)

	# Create model and cross validation object
	linear_reg = linear_model.LinearRegression(n_jobs=-1)
	strat_group_kfold = model_selection.GroupKFold(
		n_splits=n_folds
	)

	# Get R^2 scores for each fold
	r2_scores.extend(
		model_selection.cross_val_score(
			linear_reg,
			X=input_df,
			y=pheno_vals,
			groups=geno_idx,
			scoring='r2',
			cv=strat_group_kfold,
			n_jobs=-1,
			verbose=1
		)
	)

  0%|          | 0/5 [00:00<?, ?it/s][Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done   5 out of   5 | elapsed:    0.5s finished
 20%|██        | 1/5 [00:00<00:02,  1.45it/s][Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done   5 out of   5 | elapsed:    0.1s finished
 40%|████      | 2/5 [00:00<00:01,  2.26it/s][Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done   5 out of   5 | elapsed:    0.4s finished
 60%|██████    | 3/5 [00:01<00:01,  1.80it/s][Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done   5 out of   5 | elapsed:    0.1s finished
 80%|████████  | 4/5 [00:01<00:00,  2.27it/s][Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done   5 out of   5 | elapsed:    0.1s finished
100%|██████████| 5/5 [00:02<00:00,  2.29it/

In [28]:
# Compute bootstrapped confidence intervals
ci = stats.bootstrap(
    (r2_scores,), 
    statistic_fn,
    confidence_level=confidence_level,
    # num_iterations=1000
)

median_r2 = statistic_fn(r2_scores)
lower_ci = ci.confidence_interval.low
upper_ci = ci.confidence_interval.high

print(f"{statistic_fn.__name__.title()} R2: {median_r2:.3f}")
print(f"95% CI: [{lower_ci:.3f}, {upper_ci:.3f}]")

Mean R2: 0.483
95% CI: [0.462, 0.504]
