This notebook reproduces the AAV design experiments whose results are shown in Fig 5.

Variable name suffixes in the following cells denote array dimensions, where

n: number of calibration and test data points  
l: number of values of the inverse temperature, lambda  
L: length of sequence  
t: number of trials of sampling from test distribution, per lambda  
s: number of samples from test distribution  
m: number of calibration data points  
m1: m + 1  

In [18]:
import os
import sys
import time
from importlib import reload
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)
    
import numpy as np
import scipy as sc
from tensorflow import keras

import assay
import calibrate as cal

## Load held-out data and parameters of test sequence distributions

In [4]:
# load held-out data (calibration and test data)
scale = 0.1
d = np.load('../aav/test_and_calibration_data_scale{}.npz'.format(scale))
seq_n = d['seq_n']  # list of strings
y_n = d['y_n']      # true fitnesses
n = y_n.size
print('Loaded {} held-out test and calibration data points.'.format(n))

Loaded 1000000 held-out test and calibration data points.


In [5]:
# load parameters of test sequence distributions
d = np.load('../aav/models/constrained_maxent_scale{}_052722.npz'.format(scale))

# phitestnuc_lxLxk[i] is an (L, k) numpy array of unnormalized probabilities of a categorical distribution
# over k = 4 nucleotides at each of L sequence positions,
# corresponding to phi in Eq. 5 of Supp. Materials and Methods here:
# https://www.biorxiv.org/content/10.1101/2021.11.02.467003v2.full
phitestnuc_lxLxk = d['phitestnuc_lxLxk']

# note that lambda in bioRxiv above corresponds to 1 / lambda for us
lambda_l = (1 / d['temperature_l']).astype(int)
meanpredfit_l = d['meanpredfit_l']

## Construct confidence sets for designed sequences

Compute predictions and scores for all held-out data (calibration and test data).

In [6]:
# load trained NN and predict for all held-out sequences
datagen = assay.DataGenerator(seq_n)
model = keras.models.load_model('../aav/models/h100_scale{}_0_052722.npy'.format(scale))
pred_n = model.predict_generator(datagen).reshape(n)
score_n = np.abs(pred_n - y_n)  # score with residual

Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
Instructions for updating:
If using Keras pass *_constraint arguments to layers.




Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


Use rejection sampling to sample from test distribution, and construct split conformal confidence intervals for resulting designed sequences.

In [None]:
n_trial = 500
alpha = 0.1
n_cal = 10000
save_results = True
savefile = '../aav/split-results.npz'

# compute training likelihoods of all sequences
logptrain_n = assay.get_loglikelihood(seq_n, assay.PNNKAA_LXK)

n_lambda = phitestnuc_lxLxk.shape[0]
cov_lxt = np.zeros([n_lambda, n_trial])
avglen_lxt = np.zeros([n_lambda, n_trial])
fracinf_lxt = np.zeros([n_lambda, n_trial])
len_lxt = {(l, t): None for l, t in zip(range(n_lambda), range(n_trial))}
fit_lxt = {(l, t): None for l, t in zip(range(n_lambda), range(n_trial))}

for l in range(n_lambda - 1, -1, -1):
    t0 = time.time()
    print("Test distribution with lambda = {:.1f}".format(lambda_l[l]))
    
    # compute acceptance probabilities for all sequences (for rejection sampling from test distribution)
    paccept_n, logptest_n = assay.get_rejection_sampling_acceptance_probabilities(
        seq_n, phitestnuc_lxLxk[l], logptrain_n)

    # compute (unnormalized) weights for all data
    w_n = np.exp(logptest_n - logptrain_n)

    for t in range(n_trial):
        
        # partition held-out data into calibration data and test data
        # (i.e., samples from proposal distribution for rejection sampling from test distribution)
        shuffle_idx = np.random.permutation(n)
        cal_idx, test_idx = shuffle_idx[: n_cal], shuffle_idx[n_cal :]
        
        # sample from test distribution using rejection sampling
        testsamp_idx = assay.rejection_sample_from_test_distribution(paccept_n[test_idx])
        n_test = testsamp_idx.size
        if t == 0:
            print("  On trial 0, sampled {} sequences from the test distribution.".format(n_test))

        # fetch and normalize weights of calibration data
        p_sxm1 = np.hstack([np.tile(w_n[cal_idx], [n_test, 1]), w_n[test_idx[testsamp_idx]][:, None]])
        p_sxm1 /= np.sum(p_sxm1, axis=1, keepdims=True)
        
        # compute quantile of weighted calibration scores
        augscore_sxm1 = np.tile(np.hstack([score_n[cal_idx], [np.infty]]), (n_test, 1))
        q_sx1 = cal.get_weighted_quantile(1 - alpha, p_sxm1.T, augscore_sxm1.T)[:, None]
        
        # construct confidence intervals
        testpred_sx1 = pred_n[test_idx[testsamp_idx]][:, None]
        lu_sx2 =  np.hstack([testpred_sx1 - q_sx1, testpred_sx1 + q_sx1])
         
        # record confidence interval lengths, true fitnesses, and empirical coverage
        noninf_idx = np.where(np.logical_and(~np.isinf(lu_sx2[:, 0]), ~np.isinf(lu_sx2[:, 1])))[0]
        avglen_lxt[l, t] = np.mean(2 * q_sx1[noninf_idx]) if noninf_idx.size else np.nan
        fracinf_lxt[l, t] = (n_test - noninf_idx.size) / n_test
        len_lxt[(l, t)] = 2 * q_sx1.flatten()
        fit_lxt[(l, t)] = y_n[test_idx[testsamp_idx]]
        cov_lxt[l, t] = cal.get_split_coverage(lu_sx2, fit_lxt[(l, t)])
        
    print("  Empirical coverage: {:.4f}\n  Average non-inf length: {:.2f}\n  Fraction inf: {:.2f}\n  ({:.1f} s)".format(
        np.mean(cov_lxt[l]), np.nanmean(avglen_lxt[l]), np.mean(fracinf_lxt[l]), time.time() - t0))
    
    # save results after each lambda
    if save_results:
        np.savez(savefile, cov_lxt=cov_lxt, avglen_lxt=avglen_lxt,
                 fracinf_lxt=fracinf_lxt, len_lxt=len_lxt, fit_lxt=fit_lxt)

Ditto, but with randomized staircase confidence sets to achieve exact coverage (Fig. 5).

In [14]:
reload(cal)
n_trial = 500
alpha = 0.1
n_cal = 10000
save_results = True
savefile = '../aav/randomized-staircase-results.npz'

# compute training likelihoods of all sequences
logptrain_n = assay.get_loglikelihood(seq_n, assay.PNNKAA_LXK)

n_lambda = phitestnuc_lxLxk.shape[0]
avglen_lxt = np.zeros([n_lambda, n_trial])
fracinf_lxt = np.zeros([n_lambda, n_trial])
len_lxt = {(l, t): None for l, t in zip(range(n_lambda), range(n_trial))}
fit_lxt = {(l, t): None for l, t in zip(range(n_lambda), range(n_trial))}
cov_lxt = {(l, t): None for l, t in zip(range(n_lambda), range(n_trial))}

for l in range(n_lambda - 1, -1, -1):
    t0 = time.time()
    print("Test distribution with lambda = {:.1f}".format(lambda_l[l]))
    
    # compute acceptance probabilities for all sequences (for rejection sampling from test distribution)
    paccept_n, logptest_n = assay.get_rejection_sampling_acceptance_probabilities(
        seq_n, phitestnuc_lxLxk[l], logptrain_n)

    # compute (unnormalized) weights for all data
    w_n = np.exp(logptest_n - logptrain_n)

    for t in range(n_trial):
        
        # partition held-out data into calibration data and test data
        # (i.e., samples from proposal distribution for rejection sampling from test distribution)
        shuffle_idx = np.random.permutation(n)
        cal_idx, test_idx = shuffle_idx[: n_cal], shuffle_idx[n_cal :]
        
        # sample from test distribution using rejection sampling
        testsamp_idx = assay.rejection_sample_from_test_distribution(paccept_n[test_idx])
        n_test = testsamp_idx.size
        if t == 0:  # example of how many sequences are sampled from test distribution on a trial
            print("  On trial 0, sampled {} sequences from the test distribution.".format(n_test))

        # fetch and normalize weights of calibration data
        p_sxm1 = np.hstack([np.tile(w_n[cal_idx], [n_test, 1]), w_n[test_idx[testsamp_idx]][:, None]])
        p_sxm1 /= np.sum(p_sxm1, axis=1, keepdims=True)
        
        # construct randomized staircase confidence set
        testpred_s = pred_n[test_idx[testsamp_idx]]
        C_s = [cal.get_randomized_staircase_confidence_set(
            score_n[cal_idx], weights_m1, pred, alpha) for weights_m1, pred in zip(p_sxm1, testpred_s)]
         
        # record true fitnesses, empirical coverage, confidence set sizes
        fit_lxt[(l, t)] = y_n[test_idx[testsamp_idx]]
        cov_s, len_s = cal.get_randomized_staircase_coverage(C_s, fit_lxt[(l, t)])
        cov_lxt[(l, t)] = cov_s
        noninf_idx = np.where(~np.isinf(len_s))[0]
        avglen_lxt[l, t] = np.mean(len_s[noninf_idx]) if noninf_idx.size else np.nan
        fracinf_lxt[l, t] = (n_test - noninf_idx.size) / n_test
        len_lxt[(l, t)] = len_s
    
    cov = np.mean([np.mean(cov_lxt[(l, t)]) for t in range(n_trial)])
    print("  Empirical coverage: {:.4f}\n  Average non-inf length: {:.2f}\n  Fraction inf: {:.2f}\n  ({:.1f} s)".format(
        cov, np.nanmean(avglen_lxt[l]), np.mean(fracinf_lxt[l]), time.time() - t0))
        
    # save results after each lambda
    if save_results:
        np.savez(savefile, cov_lxt=cov_lxt, avglen_lxt=avglen_lxt,
                 fracinf_lxt=fracinf_lxt, len_lxt=len_lxt, fit_lxt=fit_lxt)

Test distribution with lambda = 7.0
  On trial 0, sampled 6 sequences from the test distribution.
  Average non-inf length 5.80
  Fraction inf 0.17
109.6 s
Test distribution with lambda = 6.0
  On trial 0, sampled 15 sequences from the test distribution.
  Average non-inf length 5.49
  Fraction inf 0.06
150.0 s
Test distribution with lambda = 5.0
  On trial 0, sampled 40 sequences from the test distribution.
  Average non-inf length 5.06
  Fraction inf 0.01
228.0 s
Test distribution with lambda = 4.0
  On trial 0, sampled 128 sequences from the test distribution.
  Average non-inf length 4.81
  Fraction inf 0.00
338.8 s
Test distribution with lambda = 3.0
  On trial 0, sampled 472 sequences from the test distribution.
  Average non-inf length 4.75
  Fraction inf 0.00
771.5 s
Test distribution with lambda = 2.0
  On trial 0, sampled 3233 sequences from the test distribution.
  Average non-inf length 4.69
  Fraction inf 0.00
3919.5 s
Test distribution with lambda = 1.0
  On trial 0, samp