<a href="https://colab.research.google.com/github/Yizhi-Liang/VOI-QBA/blob/main/01_net_benefit.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import pandas as pd
import numpy as np

!pip install -q lets-plot
from lets_plot import *
LetsPlot.setup_html()

In [3]:
# 1. Parameters and Setups

# simulation settings
n_sim = 10000
np.random.seed(42)

# economic and utility
wtp = 150000          # Lambda ($/QALY)
qaly_no_event = 0.8
qaly_event = 0.2
cost_sc = 50000
cost_nt = 60000

# true values
p_sc = 0.3
rr_prior_mean = 0.5
theta_prior_mean = np.log(rr_prior_mean) # -0.693

# Variance of theta function
def get_var_theta(theta, n_sc, n_nt, p_sc=p_sc):
    rr = np.exp(theta)
    p_nt = p_sc * rr

    a = n_sc * p_sc
    var_sc = 1/a - 1/n_sc
    c = n_nt * p_nt
    var_nt = 1/c - 1/n_nt

    return var_nt + var_sc

theta_prior_var = get_var_theta(
    p_sc = p_sc,
    theta = theta_prior_mean,
    n_sc = 100,
    n_nt = 100)

# study designs
## policy 1: trial
trial_cost=120*1e6
n_trial = 400
n_trial_sc = 0.5*n_trial
n_trial_nt = 0.5*n_trial
tau_trial = 4 # years to results

## polity 2: RWE
bias_b = 0.3
n_rwe=4000
n_rwe_sc=0.5*n_rwe
n_rwe_nt=0.5*n_rwe
delta_tau_rwe=2 # 2 years advance in time

# population
I_t = 10000 # annual cohort
total_yrs = 10 #  time horizon
discount_rate = 0.03
update_rate = 0.3

In [4]:
# 2. Helper Functions

# Posterior update (normal-normal)
def get_posterior_mean_variance(prior_mean, prior_var, data_mean, data_var):

    # Normal-Normal Conjugate
    prior_prec = 1.0 / prior_var
    data_prec = 1.0 / data_var
    post_prec = prior_prec + data_prec

    post_var = 1.0 / post_prec
    post_mean = post_var * (prior_mean * prior_prec + data_mean * data_prec)

    return post_mean, post_var

  # Net benefit
def calculate_net_benefit(theta, cost, qaly_no_event=qaly_no_event, qaly_event=qaly_event, p_sc=p_sc):
    """Calculates NB = E[QALYs]*WTP - Cost"""
    prob_event = np.exp(theta) * p_sc
    expected_qaly = (1 - prob_event)*qaly_no_event + prob_event*qaly_event
    return expected_qaly * wtp - cost

In [5]:
# Nested benefit
def calculate_nested_benefit(bias_b=bias_b, n_sim=n_sim):
    # Net Benefit of SC (constant)
    nb_sc = calculate_net_benefit(theta=0, cost=cost_sc)

    # Prior Only World
    theta_prior = np.random.normal(theta_prior_mean, np.sqrt(theta_prior_var), n_sim)

    # --- Policy 1: Trial Strategy ---
    # 1. Generate Unbiased Trial Data (X_bar)
    var_trial = get_var_theta(theta_prior, n_trial_sc, n_trial_nt)
    X_bar = np.random.normal(theta_prior, np.sqrt(var_trial))

    # 2. Decision Making (Posterior Belief based on X_bar)
    post_mean_trial, post_var_trial = get_posterior_mean_variance(
        prior_mean=theta_prior_mean,
        prior_var=theta_prior_var,
        data_mean=X_bar,
        data_var=var_trial
    )

    # Calculate Expected NB from the Decision Maker's perspective
    dec_samples_trial = np.random.normal(
        loc=post_mean_trial[:, np.newaxis],
        scale=np.sqrt(post_var_trial[:, np.newaxis]),
        size=(n_sim, n_sim)
    )
    # Perceived Expected NB of NT (Average across posterior samples, axis=1)
    expected_nb_nt_trial = np.mean(calculate_net_benefit(dec_samples_trial, cost_nt), axis=1)

    # 3. Decision Rule: Choose NT if Expected NB > NB_SC
    choose_nt_trial = expected_nb_nt_trial > nb_sc

    # 4. Realized Payoff (Use TRUE NB)
    payoff_trial = np.where(choose_nt_trial, expected_nb_nt_trial, nb_sc)
    nb_trial = np.mean(payoff_trial)

    # 5. Before trial disclosure and during the trial
    nb_nt_trial=np.mean(expected_nb_nt_trial)

    # --- Policy 2: RWE Strategy ---
    # 1. Hypothetical Trial with sample sizes as RWE
    hypo_var_trial = get_var_theta(theta_prior, n_rwe_sc, n_rwe_nt)
    hypo_X_bar = np.random.normal(theta_prior, np.sqrt(hypo_var_trial))

    # 2. Generate Biased RWE Data (R_bar)
    biased_mean_true = hypo_X_bar + np.log(bias_b)
    var_rwe = get_var_theta(biased_mean_true, n_rwe_sc, n_rwe_nt)
    R_bar = np.random.normal(biased_mean_true, np.sqrt(var_rwe))

    # 3. Decision Making (Posterior Belief based on R_bar)
    post_mean_rwe, post_var_rwe = get_posterior_mean_variance(
        prior_mean=theta_prior_mean,
        prior_var=theta_prior_var,
        data_mean=R_bar,
        data_var=var_rwe
    )

    # Calculate Expected NB from Decision Maker's perspective
    dec_samples_rwe = np.random.normal(
        loc=post_mean_rwe[:, np.newaxis],
        scale=np.sqrt(post_var_rwe[:, np.newaxis]),
        size=(n_sim, n_sim)
    )
    # Perceived Expected NB of NT
    expected_nb_nt_rwe = np.mean(calculate_net_benefit(dec_samples_rwe, cost_nt), axis=1)

    # 3. Decision Rule
    choose_nt_rwe = expected_nb_nt_rwe > nb_sc

    # 4. Realized Payoff
    payoff_rwe = np.where(choose_nt_rwe, expected_nb_nt_rwe, nb_sc)
    nb_rwe = np.mean(payoff_rwe)

    # 5. Before disclosure of RWE
    nb_nt_rwe = np.mean(expected_nb_nt_rwe)

    return nb_sc, nb_nt_trial, nb_nt_rwe, nb_trial, nb_rwe

In [6]:
# Policy Net Benefit
def get_policy_nb(bias_b=bias_b, delta_tau_rwe=delta_tau_rwe, trial_cost=trial_cost, n_sim=n_sim):

  # NBs
  nb_sc, nb_nt_trial, nb_nt_rwe, nb_trial, nb_rwe = calculate_nested_benefit(bias_b=bias_b, n_sim=n_sim)

  # --- Policy 1: Trial ---
  # Trial population payoff (during trial)
  trial_pop_nb = n_trial_sc * nb_sc + n_trial_nt * nb_nt_trial - trial_cost

  # Population waiting for trial results (Standard of Care)
  discounted_pop_waiting = np.sum([I_t / ((1 + discount_rate)**t) for t in range(1, tau_trial + 1)])
  before_trial_pop_nb = (discounted_pop_waiting+I_t-n_trial) * nb_sc

  # Population after trial results (t > tau_trial)
  discounted_pop_after = np.sum([I_t / ((1 + discount_rate)**t) for t in range(tau_trial + 1, total_yrs)])
  expected_nb_after = update_rate * nb_trial + (1 - update_rate) * nb_sc
  after_trial_pop_nb = discounted_pop_after * expected_nb_after

  # Total Policy 1 Payoff
  policy_1_total = trial_pop_nb + before_trial_pop_nb + after_trial_pop_nb

  # --- Policy 2: RWE ---
  # 1. Before RWE results
  cutoff_rwe = tau_trial - delta_tau_rwe
  discounted_pop_before_rwe = np.sum([I_t / ((1 + discount_rate)**t) for t in range(0, cutoff_rwe + 1)])
  expected_nb_before_rwe = update_rate * nb_nt_rwe + (1 - update_rate) * nb_sc
  policy_2_before = discounted_pop_before_rwe * expected_nb_before_rwe

  # 2. After RWE results (t > tau - delta_tau)
  discounted_pop_after_rwe = np.sum([I_t / ((1 + discount_rate)**t) for t in range(cutoff_rwe + 1, total_yrs)])
  expected_nb_after_rwe = update_rate * nb_rwe + (1 - update_rate) * nb_sc
  policy_2_after = discounted_pop_after_rwe * expected_nb_after_rwe

  policy_2_total = policy_2_before + policy_2_after

  return policy_1_total, policy_2_total

In [7]:
scaler=1.3

# Add font family configuration
times_new_roman_theme = theme(
    # title=element_text(size=12*scaler, family="Times New Roman"),
    axis_text_x=element_text(size=10*scaler, family="Times New Roman", angle=0),
    axis_text_y=element_text(size=10*scaler, family="Times New Roman"),
    legend_text=element_text(size=10*scaler, family="Times New Roman"),
    legend_position="bottom",  # Move legend to the bottom
    legend_direction="horizontal",
    legend_title=element_text(size=12*scaler, family="Times New Roman"),
    plot_title=element_text(size=16, family="Times New Roman", face = "bold"),
    plot_subtitle=element_text(size=14, family="Times New Roman"),
    axis_title_x=element_text(size=12*scaler, family="Times New Roman"),
    axis_title_y=element_text(size=12*scaler, family="Times New Roman"),
    text=element_text(size=8, family="Times New Roman")
)

In [8]:
# Figure 1: Expected Net Benefits under RWE bias
# Data Preparation

bias_b_range = np.arange(1.0, 4.0, 0.05)
results = []
for b in bias_b_range:
    p1, p2 = get_policy_nb(bias_b=b, n_sim=n_sim)
    results.append({'bias_b': b, 'Policy 1: Waiting for Trial': p1, 'Policy 2: Conditional Adoption': p2})

df_res = pd.DataFrame(results)

# Convert to Billions
df_res['Policy 1: Waiting for Trial'] = df_res['Policy 1: Waiting for Trial'] / 1e9
df_res['Policy 2: Conditional Adoption'] = df_res['Policy 2: Conditional Adoption'] / 1e9

df_plot1 = pd.melt(df_res, id_vars=['bias_b'],
                  value_vars=['Policy 1: Waiting for Trial', 'Policy 2: Conditional Adoption'],
                  var_name='Policy Scenario', value_name='Net Benefit')

# Find intersection using linear interpolation
diffs = df_res['Policy 1: Waiting for Trial'] - df_res['Policy 2: Conditional Adoption']
idx = np.where(np.diff(np.sign(diffs)))[0]
intersection_x = None

if len(idx) > 0:
    i = idx[0]
    x1, x2 = df_res.bias_b.iloc[i], df_res.bias_b.iloc[i+1]
    y1, y2 = diffs.iloc[i], diffs.iloc[i+1]
    intersection_x = x1 + (x2 - x1) * (0 - y1) / (y2 - y1)

In [9]:
# Figure 1: Expected Net Benefits under RWE bias
# Plotting

# Custom colors
colors = {
    'Policy 1: Waiting for Trial': '#E41A1B',
    'Policy 2: Conditional Adoption': '#377EB8'
}

plot1 = (
    ggplot(df_plot1, aes(x="bias_b", y="Net Benefit", color="Policy Scenario"))
    + geom_line(size=1.0)
    + scale_color_manual(values=colors)
    + scale_x_continuous(name="RWE Bias Factor (B)", format=".1f")
    + scale_y_continuous(name="Population Expected Net Benefit ($ Billions)", format=".2f")
    + theme_bw()
    + times_new_roman_theme
    + theme(legend_title=element_blank())
)

if intersection_x:
    plot1 += geom_vline(xintercept=intersection_x, linetype='dashed', color='black')
    y_mid = (df_plot1['Net Benefit'].max() + df_plot1['Net Benefit'].min()) / 2
    plot1 += geom_text(x=intersection_x + 0.03, y=y_mid+0.08, label=f"PIB ≈ {intersection_x:.2f}",
                       color='black', hjust=0, vjust=0)

plot1

In [10]:
# Figure 2: PIB using QBA
# Data Preparation

# Define the bias funciton
def get_bias(p_u_sc, rr_ux, rr_uy):
    if p_u_sc <= 0:
        raise ValueError("p_u_sc must be > 0.")
    num = (1.0 / p_u_sc) + rr_ux * (rr_uy - 1.0)
    den = (1.0 / p_u_sc) + (rr_uy - 1.0)
    return num / den

p_u_sc_const = 0.2
rr_ux_min, rr_ux_max = 1.0, 7.1
rr_uy_min, rr_uy_max = 1.0, 7.1
step = 0.02

rr_ux_vals = np.round(np.arange(rr_ux_min, rr_ux_max + 1e-12, step), 6)
rr_uy_vals = np.round(np.arange(rr_uy_min, rr_uy_max + 1e-12, step), 6)

df_plot2 = pd.DataFrame({
    "rr_ux": np.repeat(rr_ux_vals, len(rr_uy_vals)),
    "rr_uy": np.tile(rr_uy_vals, len(rr_ux_vals)),
})
df_plot2["bias"] = get_bias(p_u_sc_const, df_plot2["rr_ux"].to_numpy(), df_plot2["rr_uy"].to_numpy())

a = 1.0 / p_u_sc_const
rr_ux_curve = np.linspace(max(rr_ux_min, intersection_x + 1e-3), rr_ux_max, 600)
rr_uy_curve = 1.0 + a * (intersection_x - 1.0) / (rr_ux_curve - intersection_x)

curve = pd.DataFrame({"rr_ux": rr_ux_curve, "rr_uy": rr_uy_curve})
curve = curve[(curve["rr_uy"] >= rr_uy_min) & (curve["rr_uy"] <= rr_uy_max)]

In [11]:
# Figure 2: PIB using QBA

# Colors from plot1
color_p1 = '#E41A1B' # Red (Policy 1 - Favor Trial)
color_p2 = '#377EB8' # Blue (Policy 2 - Favor RWE)

# Calculate arrow target on the curve
a_param = 1.0 / p_u_sc_const
x_arrow = 4.0
y_arrow = 1.0 + a_param * (intersection_x - 1.0) / (x_arrow - intersection_x)

# Text position
x_text = 4.6
y_text = 4.5

# Plotting
plot2 = (
    ggplot(df_plot2, aes("rr_ux", "rr_uy"))
    + geom_tile(aes(fill="bias"))
    + scale_fill_gradient2(
        low=color_p2,
        mid='white',
        high=color_p1,
        midpoint=intersection_x,
        name='RWE Bias Factor (B)'
    )
    + geom_line(data=curve, mapping=aes("rr_ux", "rr_uy"), color="black", size=1.2, linetype='dashed')
    + geom_text(x=x_text, y=y_text, label=f"PIB ≈ {intersection_x:.2f}", color="black")
    + geom_segment(x=x_text-0.2, y=y_text-0.1, xend=x_arrow, yend=y_arrow,
                   arrow=arrow(type='closed', length=15, angle=20), color="black", size=0.5)

    + coord_cartesian(xlim=(rr_ux_min, rr_ux_max), ylim=(rr_uy_min, rr_uy_max))
    + labs(
        x="Confounder-Treatment Association",
        y="Confounder-Outcome Association"
    )
    + scale_x_continuous(format='.1f')
    + scale_y_continuous(format='.1f')
    + theme_bw()
    + times_new_roman_theme
    + theme(legend_text=element_text(face="bold"))
)

plot2

In [12]:
# Figure 3: Faceted Plot (3x3)
# Data Preparation

# Parameters for the grid
timing_options = [1, 2, 3]
cost_options = [80*1e6, 120*1e6, 160*1e6]
cost_labels = {80*1e6: "Low ($80M)", 120*1e6: "Mid ($120M)", 160*1e6: "High ($160M)"}

records_grid = []
intersections_grid = []

bias_range_grid = np.arange(1.0, 4.0, 0.05)

for dt in timing_options:
    for cost in cost_options:
        p1_vals = []
        p2_vals = []
        biases = []

        # Generate data for this panel
        for b in bias_range_grid:
            p1, p2 = get_policy_nb(bias_b=b, delta_tau_rwe=dt, trial_cost=cost, n_sim=n_sim)

            val_p1 = np.mean(p1) / 1e9
            val_p2 = np.mean(p2) / 1e9

            p1_vals.append(val_p1)
            p2_vals.append(val_p2)
            biases.append(b)

            records_grid.append({
                "bias_b": b,
                "Net Benefit": val_p1,
                "Policy": "Policy 1: Trial",
                "Timing": f"Advantage: {dt} Year(s)",
                "Cost": cost_labels[cost]
            })
            records_grid.append({
                "bias_b": b,
                "Net Benefit": val_p2,
                "Policy": "Policy 2: RWE",
                "Timing": f"Advantage: {dt} Year(s)",
                "Cost": cost_labels[cost]
            })

        # Find intersection for this panel
        diffs = np.array(p1_vals) - np.array(p2_vals)
        idx = np.where(np.diff(np.sign(diffs)))[0]
        if len(idx) > 0:
            i = idx[0]
            x1, x2 = biases[i], biases[i+1]
            y1, y2 = diffs[i], diffs[i+1]
            if not np.isclose(y2 - y1, 0.0):
                sect_x = x1 + (x2 - x1) * (0 - y1) / (y2 - y1)

                # Store intersection data for geom_vline and geom_text
                intersections_grid.append({
                    "Timing": f"Advantage: {dt} Year(s)",
                    "Cost": cost_labels[cost],
                    "x_intercept": sect_x,
                    "label": f"PIB≈{sect_x:.2f}"
                })

df_grid = pd.DataFrame(records_grid)
df_int = pd.DataFrame(intersections_grid)

In [13]:
# Figure 3: Faceted Plot (3x3)
# Plotting

# Re-create dataframes from the original lists to ensure clean state
df_grid = pd.DataFrame(records_grid)
df_int = pd.DataFrame(intersections_grid)

# Explicitly enforce order and sort for plotting
cost_order = ["Low ($80M)", "Mid ($120M)", "High ($160M)"]

df_grid['Cost'] = pd.Categorical(df_grid['Cost'], categories=cost_order, ordered=True)
df_grid = df_grid.sort_values(by=['Timing', 'Cost'])

df_int['Cost'] = pd.Categorical(df_int['Cost'], categories=cost_order, ordered=True)
df_int = df_int.sort_values(by=['Timing', 'Cost'])

plot3 = (
    ggplot(df_grid, aes(x="bias_b", y="Net Benefit", color="Policy"))
    + geom_line(size=1.0)
    + facet_grid(x="Cost", y="Timing", x_order=0)
    + scale_color_manual(values=colors)

    # Add vertical lines for PIB
    + geom_vline(data=df_int, mapping=aes(xintercept="x_intercept"), linetype="dashed", color="black")
    # Add Text labels
    + geom_text(data=df_int, mapping=aes(x="x_intercept", label="label"),
                y=df_grid["Net Benefit"].min(), color="black", hjust=-0.2, vjust=-0.5, size=8)

    + labs(x="RWE Bias Factor (B)",
           y="Population Expected Net Benefit ($ Billions)")
    + theme_bw()
    + times_new_roman_theme
    + theme(legend_title=element_blank())
)

plot3

In [14]:
!pip install -q Pillow
figure_path = "/content/drive/MyDrive/Research/02_VOI_RWE_QBA/03_output/figure"

ggsave(plot=plot1, filename=f"{figure_path}/01_NB_Bias.pdf")

ggsave(plot=plot2, filename=f"{figure_path}/02_Bias_Interaction.pdf")

ggsave(plot=plot3, filename=f"{figure_path}/03_Time_Advantage.pdf")

'/content/drive/MyDrive/Research/02_VOI_RWE_QBA/03_output/figure/03_Time_Advantage.pdf'