<a href="https://colab.research.google.com/github/RodrigoZepeda/ShinglesDeltaMethod/blob/main/DeltaMethod_PIF_Shingles.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Delta Method for Potential Impact Fraction

The following code contains the calculation necesary for estimating the variance of the potential impact fraction (PIF) with binary relative risks. It assumes the variance of the log relative risk is known as well as the variance of the proportion of individuals exposed.

In [1]:
%pip install --quiet "session_info"

## Problem definition

Let $p$ denote the probability of being exposed. Let $RR$ denote the relative risk of the exposure and $\ell = \ln RR$ the logarithm of that relative risk (no exposure has relative risk $=1$). Define:
\begin{equation}
\begin{aligned}
\mu^{\textrm{obs}} & = p\cdot \exp \big(\ln (RR)\big) + (1 - p)\cdot 1 = 1 + p\cdot (\exp(\ell) - 1)\\
\mu^{\textrm{cft}} & = 1 + p_* \cdot (\exp(\ell) - 1)\\
\end{aligned}
\end{equation}
Define the potential impact fraction as:
\begin{equation}
\text{PIF} = \dfrac{\mu^{\textrm{obs}}  - \mu^{\textrm{cft}}}{\mu^{\textrm{obs}}} = 1 - \dfrac{\mu^{\textrm{cft}}}{\mu^{\textrm{obs}}}
\end{equation}

For our scenarios we will only consider counterfactuals that **reduce** the relative risk, hence:
$$
0 <  \mu_{cft} \leq \mu_{obs}.
$$

Finally consider $\hat{p}$ an estimator of $p$ with variance $\sigma_{\hat{p}}^2$ and $\widehat{\text{RR}}$ an estimate of $\text{RR}$. Assume the variance that is known is for $\hat{\ell} = \widehat{\log \text{RR}}$ and given by $\sigma^2_{\ell}$. The plug-in estimator of the potential impact fraction, $\widehat{\text{PIF}}$, is:
\begin{equation}
\widehat{\text{PIF}} = 1 - \dfrac{\hat{\mu}^{\textrm{cft}}}{\hat{\mu}^{\textrm{obs}}}
\end{equation}
with
\begin{equation}
\begin{aligned}
\hat{\mu}^{\textrm{obs}} & = 1 + \hat{p}\cdot \big(\exp(\widehat{\ell}) - 1\big)\\
\hat{\mu}^{\textrm{cft}} & = 1 + p_* \cdot \big(\exp(\widehat{\ell}) - 1\big)\\
\end{aligned}
\end{equation}

The following Python code helps set up the expressions

In [2]:
#We'll use the sympy library for symbolic computation of variances and expected values
import sympy
from sympy import *
from sympy.stats import Variance, Expectation, Covariance
from sympy.stats.rv import RandomSymbol
from sympy import exp
from scipy.stats import norm
import session_info
import numpy as np
import pandas as pd
from datetime import datetime


print(sympy.__version__)   #version of sympy should be >=1.10

#Create the constants
p = Symbol('p', real=True, positive=True)            #Proportion exposed
p_star = Symbol('p_star', real=True, positive=True)  #Proportion exposed under counterfactual
L = Symbol('L', real=True, positive=True)            #Log Relative risk of exposed (i.e. relative risk = exp(L))

#Create the random symbols for RR and p hat
p_hat = RandomSymbol('p_hat')             #Estimate of p
L_hat = RandomSymbol('L_hat')             #Estimate of log(RR)

sigma_p = Symbol('sigma_p^2', real=True, positive=True) #Variance of p_hat
sigma_l = Symbol('sigma_l^2', real=True, positive=True) #Variance of log(RR_hat)

1.13.3


## Delta method estimation

Here we construct a function that applies the delta method to any function of the potential impact fraction

In [3]:
#Potential impact fraction definition
def pif(L, p, p_star, sigma_p, sigma_l):
  mu_cft = 1 + p_star*(sympy.exp(L) - 1)
  mu_obs = 1 + p*(sympy.exp(L) - 1)
  return 1 - mu_cft/mu_obs

#Replace Cov(1, X) = 0, Cov(1, 1) = 0.0 and Var(X + constant) = Var(X)
def simplify_covariance(expr):
    expr = expr.subs(
        Covariance(1, p_hat), 0
    ).subs(
        Covariance(1, L_hat), 0
    ).subs(
        Covariance(1, 1), 0
    ).subs(
        Variance(-p + p_hat), Variance(p_hat)
    ).subs(
        Variance(-L + L_hat), Variance(L_hat)
    )
    return expr

#Replace Var(X) = sigma_X and Cov(X,Y) = 0 as p and L are independent
def expand_variance(expr):
    expr = expr.subs(
        Variance(p_hat), sigma_p
    ).subs(
        Variance(L_hat), sigma_l
    ).subs(
        Covariance(p_hat, L_hat), 0.0
    )
    return expr


def delta_method(pif_function, substitute_mu=False):
  """
  Function that applies the delta method to a function of pif

  The pif function should have at least two variables: p and L
  corresponding to the proportion infected and the log relative risk
  """

  #Obtain the direction vector
  v = Matrix([L_hat - L, p_hat - p])

  #And calculate the gradient
  grad = derive_by_array(pif_function(L, p, p_star, sigma_p, sigma_l), [L, p])
  grad = Matrix(grad)

  #Thus calculating the Hadamard (directional) derivative:
  gradient_directed = grad.dot(v)

  fun_variance = Variance(gradient_directed).expand().simplify()

  #Simplify the variance
  fun_variance = simplify_covariance(fun_variance)

  #Expand my substituting the sigmas
  fun_variance = expand_variance(fun_variance)

  #Simplify the denominator
  fun_variance = fun_variance.replace(
    fraction(fun_variance)[1],fraction(fun_variance)[1].simplify()
  )

  return fun_variance.simplify()


# Estimation of the delta method

After applying the above functions we obtain that an approximation of the variance is given by:

In [4]:
simplified_pif = delta_method(pif, False)
display(simplified_pif)
simplified_pif

sigma_l^2*(-p + p_star)**2*exp(2*L)/(-p*exp(L) + p - 1)**4 + sigma_p^2*(1 - exp(L))**2*(-p_star*exp(L) + p_star - 1)**2/(-p*exp(L) + p - 1)**4

sigma_l^2*(-p + p_star)**2*exp(2*L)/(-p*exp(L) + p - 1)**4 + sigma_p^2*(1 - exp(L))**2*(-p_star*exp(L) + p_star - 1)**2/(-p*exp(L) + p - 1)**4

In [5]:
print(simplified_pif)

sigma_l^2*(-p + p_star)**2*exp(2*L)/(-p*exp(L) + p - 1)**4 + sigma_p^2*(1 - exp(L))**2*(-p_star*exp(L) + p_star - 1)**2/(-p*exp(L) + p - 1)**4


In [6]:
#Transform the previous results to function
fun_dm_pif = lambdify([L, p, p_star, sigma_p, sigma_l], simplified_pif)

In [7]:
#Calculate a specific PIF
def pif_calculator(L, p, p_star, sigma_p, sigma_l):
  pif_value = pif(L, p, p_star, sigma_p, sigma_l)
  variance = fun_dm_pif(L, p, p_star, sigma_p, sigma_l)
  confint = norm.ppf(0.975)*sqrt(variance)
  return 100*pif_value, 100*(pif_value - confint), 100*(pif_value + confint), variance

def case_calculator(cases, pif, sigma_cases, sigma_pif, scale=1000000/1000):
  variance = sigma_cases*sigma_pif + sigma_cases*pow(pif, 2) + sigma_pif*pow(cases,2)
  confint = norm.ppf(0.975)*sqrt(variance)
  return scale*pif*cases, scale*(pif*cases - confint), scale*(pif*cases + confint), pow(scale,2)*variance

def get_var(UCL_RR, LCL_RR):
  return pow((UCL_RR - LCL_RR) / (2 * norm.ppf(0.975)),2)

#Get SE^2 from confidence intervals
def get_var_log(UCL_RR, LCL_RR):
  return get_var(log(UCL_RR), log(LCL_RR))


## PIF and prevented cases

In this section we use the attributable dementia cases from [Rajan et al](https://alz-journals.onlinelibrary.wiley.com/doi/10.1002/alz.12362) as well as the relative risk estimates from both Wu and yin to estimate the impact fraction and the cases.

In [8]:
#Global values
pval    = 0.438
sigma_p_val = pow(0.0184,2)

#Attributable cases
#From https://alz-journals.onlinelibrary.wiley.com/doi/10.1002/alz.12362
df = pd.DataFrame({
    'year': [2025, 2030, 2040, 2050, 2060],
    "cases": [7.16, 8.53, 11.16, 12.73, 13.85],
    'cases_low': [6.78, 8.07, 10.55, 11.99, 12.98],
    'cases_up': [7.55, 8.99, 11.77, 13.46, 14.71],
    'cft_increase': [1.0, 1.15, 1.30, 1.45, 1.60],
})

#Get coverage
df['coverage'] = 100*pval*df['cft_increase']

#Get incidence
df['incidence'] = df['cases'].diff().fillna(0)

#Get the variance of the cases from the confidence interval
df['variance'] = df.apply(lambda row: get_var(row['cases_up'], row['cases_low']), axis=1)

#Variance of incidence (upper bound)
df['variance_incidence'] = pow((df['variance'] + df['variance'].shift(1, fill_value=0)),2)

#Remove year 2025
df = df[df["year"] > 2025]

#1) WU PIF
UCL_RR  = 0.72
LCL_RR  = 0.67
RR_val  = 0.69
sigma_wu_val = get_var_log(UCL_RR, LCL_RR)

#Get variance of difference
pifs = ['pif_wu','pif_wu_low','pif_wu_up','pif_wu_variance']
df[pifs] = df.apply(
    lambda row: pif_calculator(np.log(RR_val), pval, row["cft_increase"]*pval, sigma_p_val, sigma_wu_val),
    axis=1, result_type='expand')

cases = ['cases_wu','cases_wu_low','cases_wu_up','cases_wu_variance']
df[cases] = df.apply(
    lambda row: case_calculator(row['incidence'], row['pif_wu'] / 100.0, row['variance_incidence'], row['pif_wu_variance']),
    axis=1, result_type='expand')

pifs_ideal = ['pif_wu_ideal','pif_wu_ideal_low','pif_wu_ideal_up','pif_wu_ideal_variance']
df[pifs_ideal] = df.apply(
    lambda row: pif_calculator(np.log(RR_val), pval, 1.0, sigma_p_val, sigma_wu_val),
    axis=1, result_type='expand')

cases_ideal = ['cases_wu_ideal','cases_wu_ideal_low','cases_wu_ideal_up','cases_wu_ideal_variance']
df[cases_ideal] = df.apply(
    lambda row: case_calculator(row['incidence'], row['pif_wu_ideal'] / 100.0, row['variance_incidence'], row['pif_wu_ideal_variance']),
    axis=1, result_type='expand')

#2) YIN PIF
UCL_RR  = 0.76
LCL_RR  = 0.66
RR_val  = 0.71
sigma_yin_val = get_var_log(UCL_RR, LCL_RR)

pifs = ['pif_yin','pif_yin_low','pif_yin_up','pif_yin_variance']
df[pifs] = df.apply(
    lambda row: pif_calculator(np.log(RR_val), pval, row["cft_increase"]*pval, sigma_p_val, sigma_yin_val),
    axis=1, result_type='expand')

cases = ['cases_yin','cases_yin_low','cases_yin_up','cases_yin_variance']
df[cases] = df.apply(
    lambda row: case_calculator(row['incidence'], row['pif_yin'] / 100.0, row['variance_incidence'], row['pif_yin_variance']),
    axis=1, result_type='expand')

pifs_ideal = ['pif_yin_ideal','pif_yin_ideal_low','pif_yin_ideal_up','pif_yin_ideal_variance']
df[pifs_ideal] = df.apply(
    lambda row: pif_calculator(np.log(RR_val), pval, 1.0, sigma_p_val, sigma_yin_val),
    axis=1, result_type='expand')

cases_ideal = ['cases_yin_ideal','cases_yin_ideal_low','cases_yin_ideal_up','cases_yin_ideal_variance']
df[cases_ideal] = df.apply(
    lambda row: case_calculator(row['incidence'], row['pif_yin_ideal'] / 100.0, row['variance_incidence'], row['pif_yin_ideal_variance']),
    axis=1, result_type='expand')

In [9]:
#Format by rounding
df[['pif_wu','pif_wu_low','pif_wu_up','pif_wu_ideal','pif_wu_ideal_low','pif_wu_ideal_up']] = df[['pif_wu','pif_wu_low','pif_wu_up','pif_wu_ideal','pif_wu_ideal_low','pif_wu_ideal_up']].map(lambda x: f"{x:,.2f}")
df[['cases_wu','cases_wu_low','cases_wu_up','cases_wu_ideal','cases_wu_ideal_low','cases_wu_ideal_up']] = df[['cases_wu','cases_wu_low','cases_wu_up','cases_wu_ideal','cases_wu_ideal_low','cases_wu_ideal_up']].map(lambda x: f"{x:,.0f}")

df[['pif_yin','pif_yin_low','pif_yin_up','pif_yin_ideal','pif_yin_ideal_low','pif_yin_ideal_up']] = df[['pif_yin','pif_yin_low','pif_yin_up','pif_yin_ideal','pif_yin_ideal_low','pif_yin_ideal_up']].map(lambda x: f"{x:,.2f}")
df[['cases_yin','cases_yin_low','cases_yin_up','cases_yin_ideal','cases_yin_ideal_low','cases_yin_ideal_up']] = df[['cases_yin','cases_yin_low','cases_yin_up','cases_yin_ideal','cases_yin_ideal_low','cases_yin_ideal_up']].map(lambda x: f"{abs(x):,.0f}")

#Make just one column
df['pif_yin_formatted'] = df.apply(
    lambda row: f"{row['pif_yin']} ({row['pif_yin_low']}, {row['pif_yin_up']})",
    axis=1
)

df['pif_yin_ideal_formatted'] = df.apply(
    lambda row: f"{row['pif_yin_ideal']} ({row['pif_yin_ideal_low']}, {row['pif_yin_ideal_up']})",
    axis=1
)


df['pif_wu_formatted'] = df.apply(
    lambda row: f"{row['pif_wu']} ({row['pif_wu_low']}, {row['pif_wu_up']})",
    axis=1
)

df['pif_wu_ideal_formatted'] = df.apply(
    lambda row: f"{row['pif_wu_ideal']} ({row['pif_wu_ideal_low']}, {row['pif_wu_ideal_up']})",
    axis=1
)

df['cases_yin_formatted'] = df.apply(
    lambda row: f"{row['cases_yin']} ({row['cases_yin_low']}, {row['cases_yin_up']})",
    axis=1
)

df['cases_yin_ideal_formatted'] = df.apply(
    lambda row: f"{row['cases_yin_ideal']} ({row['cases_yin_ideal_low']}, {row['cases_yin_ideal_up']})",
    axis=1
)

df['cases_wu_formatted'] = df.apply(
    lambda row: f"{row['cases_wu']} ({row['cases_wu_low']}, {row['cases_wu_up']})",
    axis=1
)

df['cases_wu_ideal_formatted'] = df.apply(
    lambda row: f"{row['cases_wu_ideal']} ({row['cases_wu_ideal_low']}, {row['cases_wu_ideal_up']})",
    axis=1
)


#Re-scale the counterfactual increase
df["cft_increase_formatted"] = df["cft_increase"].map(lambda x: f"{100*float(x - 1):,.0f}")
df["coverage_formatted"] = df["coverage"].map(lambda x: f"{float(x):,.0f}")

df['coverage_formatted'] = df.apply(
    lambda row: f"{row['coverage_formatted']} (↑ {row['cft_increase_formatted']})",
    axis=1
)



In [10]:
# Reshape table for paper
presentation_df = pd.DataFrame({
    'Source': [
        "","", "Wu et al", "Wu et al", "yin et al", "yin et al"
    ],
    '2030': [
        "Guidelines",
        df.loc[df['year'] == 2030, 'coverage_formatted'].values[0],
        df.loc[df['year'] == 2030, 'pif_wu_formatted'].values[0],
        df.loc[df['year'] == 2030, 'cases_wu_formatted'].values[0],
        df.loc[df['year'] == 2030, 'pif_yin_formatted'].values[0],
        df.loc[df['year'] == 2030, 'cases_yin_formatted'].values[0]
    ],
    '2030 ': [
        "Ideal",
        "100 (↑ 128)",
        df.loc[df['year'] == 2030, 'pif_wu_ideal_formatted'].values[0],
        df.loc[df['year'] == 2030, 'cases_wu_ideal_formatted'].values[0],
        df.loc[df['year'] == 2030, 'pif_yin_ideal_formatted'].values[0],
        df.loc[df['year'] == 2030, 'cases_yin_ideal_formatted'].values[0]
    ],
    '2040': [
        "Guidelines",
        df.loc[df['year'] == 2040, 'coverage_formatted'].values[0],
        df.loc[df['year'] == 2040, 'pif_wu_formatted'].values[0],
        df.loc[df['year'] == 2040, 'cases_wu_formatted'].values[0],
        df.loc[df['year'] == 2040, 'pif_yin_formatted'].values[0],
        df.loc[df['year'] == 2040, 'cases_yin_formatted'].values[0]
    ],
    '2040 ': [
        "Ideal",
        "100 (↑ 128)",
        df.loc[df['year'] == 2040, 'pif_wu_ideal_formatted'].values[0],
        df.loc[df['year'] == 2040, 'cases_wu_ideal_formatted'].values[0],
        df.loc[df['year'] == 2040, 'pif_yin_ideal_formatted'].values[0],
        df.loc[df['year'] == 2040, 'cases_yin_ideal_formatted'].values[0]
    ],
    '2050': [
        "Guidelines",
        df.loc[df['year'] == 2050, 'coverage_formatted'].values[0],
        df.loc[df['year'] == 2050, 'pif_wu_formatted'].values[0],
        df.loc[df['year'] == 2050, 'cases_wu_formatted'].values[0],
        df.loc[df['year'] == 2050, 'pif_yin_formatted'].values[0],
        df.loc[df['year'] == 2050, 'cases_yin_formatted'].values[0]
    ],
    '2050 ': [
        "Ideal",
        "100 (↑ 128)",
        df.loc[df['year'] == 2050, 'pif_wu_ideal_formatted'].values[0],
        df.loc[df['year'] == 2050, 'cases_wu_ideal_formatted'].values[0],
        df.loc[df['year'] == 2050, 'pif_yin_ideal_formatted'].values[0],
        df.loc[df['year'] == 2050, 'cases_yin_ideal_formatted'].values[0]
    ],
    '2060': [
        "Guidelines",
        df.loc[df['year'] == 2060, 'coverage_formatted'].values[0],
        df.loc[df['year'] == 2060, 'pif_wu_formatted'].values[0],
        df.loc[df['year'] == 2060, 'cases_wu_formatted'].values[0],
        df.loc[df['year'] == 2060, 'pif_yin_formatted'].values[0],
        df.loc[df['year'] == 2060, 'cases_yin_formatted'].values[0]
    ],
    '2060 ': [
        "Ideal",
        "100 (↑ 128)",
        df.loc[df['year'] == 2060, 'pif_wu_ideal_formatted'].values[0],
        df.loc[df['year'] == 2060, 'cases_wu_ideal_formatted'].values[0],
        df.loc[df['year'] == 2060, 'pif_yin_ideal_formatted'].values[0],
        df.loc[df['year'] == 2060, 'cases_yin_ideal_formatted'].values[0]
    ]

}, index=["Scenario","Vaccination % coverage (↑ % increase)",
          "Potential Impact Fraction (%)", "Prevented Cases (%)",
          "Potential Impact Fraction (%)", "Prevented Cases (%)"])


# Generate filename with today's date (e.g., "pif_shingles_results_2023-11-15.csv")
today = datetime.now().strftime("%Y_%b_%d")
filename = f"pif_shingles_results_{today}.csv"

# Save
presentation_df.to_csv(filename, index=True, encoding='utf-8')
display(presentation_df)

Unnamed: 0,Source,2030,2030.1,2040,2040.1,2050,2050.1,2060,2060.1
Scenario,,Guidelines,Ideal,Guidelines,Ideal,Guidelines,Ideal,Guidelines,Ideal
Vaccination % coverage (↑ % increase),,50 (↑ 15),100 (↑ 128),57 (↑ 30),100 (↑ 128),64 (↑ 45),100 (↑ 128),70 (↑ 60),100 (↑ 128)
Potential Impact Fraction (%),Wu et al,"2.36 (1.07, 3.64)","20.16 (18.02, 22.29)","4.71 (3.41, 6.02)","20.16 (18.02, 22.29)","7.07 (5.70, 8.44)","20.16 (18.02, 22.29)","9.43 (7.97, 10.89)","20.16 (18.02, 22.29)"
Prevented Cases (%),Wu et al,"32 (14, 50)","276 (229, 323)","124 (87, 161)","530 (448, 612)","111 (72, 150)","316 (217, 416)","106 (41, 170)","226 (91, 361)"
Potential Impact Fraction (%),yin et al,"2.18 (0.93, 3.43)","18.67 (14.85, 22.49)","4.37 (2.93, 5.80)","18.67 (14.85, 22.49)","6.55 (4.84, 8.26)","18.67 (14.85, 22.49)","8.73 (6.69, 10.77)","18.67 (14.85, 22.49)"
Prevented Cases (%),yin et al,"30 (12, 48)","256 (193, 318)","115 (75, 155)","491 (376, 606)","103 (62, 144)","293 (187, 399)","98 (36, 160)","209 (78, 340)"


#Reproducibility

In [11]:
session_info.show()