In [1]:
import numpy as np
import pandas as pd
from scipy.stats import invgauss
from lifelines import CoxPHFitter

### Explore HR thta would manifest in a real-world cohort

In [2]:
def rinvg(theta_0, N):  # random number following invG    
    lam = 2 * theta_0  
    mu = 1 / lam
    y  = invgauss.rvs( mu, loc = 0, scale = lam, size=N)
    return y

In [3]:
def calc_lambda_0(p0, theta_0, delta, rate, ln_r, invG): # returns lambda0 given parameters and initial risk p0
    
    if invG:  # frailty = inverse Gaussian  
        y = - 0.5 * np.log(1- p0)/delta + delta
        y =  y**2 - theta_0
        y /= rate - 1  
        return y * ln_r
    
    # frailty = Gamma       
    y = -np.log(1-p0)/delta
    y = (np.exp(y)-1)/(rate - 1) 
    return theta_0 * y * ln_r       

In [4]:
def cohort_size(RR, RR_0, rate, p0, start, end):
    '''
    Returns a cohort_size given RR, rate, p0, t_0, and t_obs
      1. calculate a sample size of each arm of a hypothetical RCT over [t_0, t_obs] to ensure 90% power to detect RR vs RR_0 with a two-sided 0.05 significance level,
         where RR_0 denotes the minimal RR that has clinical significance      
      2. return a cohort_size such that expected # of positive prior history satisfies the required sample size
       
    '''
    h0  = -np.log(1 - p0)   
    h0 /=  rate - 1                      # h0 = lambda_0/log(rate)
    if RR_0 is None:        
        RR_0 = 1 + 0.5*(RR-1)            # the default minimal effect is 50% of the assumed RR        
    At = (rate**end - rate**start) * h0  # cumulative hazard from entry to end 
    A0 = (rate**start - 1) * h0          # cumulative hazard at the entry
    S = np.exp(-At)
    n = (RR + 1)*np.sqrt(RR_0)*1.96 + (RR_0 + 1)*np.sqrt(RR)*0.84
    n /= RR - RR_0
    n **= 2               # required # of events
    n /= 2 - S - S**RR    # sample size of each arm
    n /= 1 - np.exp(-A0)  # cohort size = arm size / expectation of prior history
    return  (n//1000 + 1).astype(int) * 1000

In [5]:
def generate_event(lambda_0, rate, ln_r, N):  # returns time to an event and the hazard at that time   # ln_r = log(rate)
    
    t = np.random.rand(N)                      # inverse transform sampling using 
    t = 1 - ln_r * np.log(1 - t)/lambda_0      
    t = np.log(t) / ln_r
    
    return t, lambda_0 * rate**t

def generate_event_d(lambda_0, rate, ln_r, N, start, d):    
    '''
    arg 
      d: maximal number of events per individual; set an integer large enough to ensure the d-th event occurs after entry
      ln_r = log(rate)
    value
      (N, K)-numpy array of times to events 
       row; individuals
       col: times to event 1, 2, ..., K,    
    '''    
    lambda_t = lambda_0
    t = 0
    tmin = 0    
    t_ev_list = np.empty((N,d))
    i = 0
    while (tmin < start):
        t_ev, lambda_t = generate_event(lambda_t, rate, ln_r, N)
        t += t_ev
        t_ev_list[:,i] = t
        tmin = t.min()   
        i += 1
        if i >= d:
            raise ValueError("Final event(s) occurred before entry! Set a larger d")
    return t_ev_list[:, :i]

def sim_cohort(lambda_0, theta_0, rate, ln_r, start, follow, N, d, invG):

    if invG:         
        t = rinvg(theta_0, N)       
    else:            
        t = np.random.gamma(theta_0, 1/theta_0, N)    
       
    t *= lambda_0
  
    t = generate_event_d(t, rate, ln_r, N, start, d) # N x K array of time to events (K <= d)

    history = t.min(axis = 1) < start   # prior history is positive if the first event occurred before entry
    t -= start                          # time from entry to events
    t = np.where(t > 0, t, 20)          # discard prior events (negative t) by making them sufficiently late
    t = np.min(t, axis = 1)             # time to the first event after entry 
    event = t < follow  
    t = np.where(event, t, follow)      # T = event*t + (1-event)*follow
    t = pd.DataFrame({"T": t, "event": event, "history": history})

    history  = history.mean()
    event = t.groupby("history")["event"].mean().values 
    cph = CoxPHFitter()
    cph.fit(t[["T", 'event', 'history']], duration_col = 'T', event_col = 'event')
       
    return cph.params_[0].round(5), cph.variance_matrix_.values[0,0].round(5),  np.round(history,3),  *event.round(3)   

In [6]:
def calc_summary(results, N, Nsim): 
    '''
    values: an array
         [0] coefficient (logHR) of prior history
         [1]  95% CI of coefficient, lower 
         [2]  95% CI of coefficient, upper 
         [3] HR of prior history
         [4]  95% CI of HR, lower 
         [5]  95% CI of HR, upper
         [6] proportion of positive prior history
         [7] proportion of failure among those without prior history  
         [8] proportion of failure among those with prior history  
         [9] cohort size
    '''
    v = results[:, 0].var()
    mu = results[:, 0].mean()
    v += np.mean(results[:, 1]**2)    # Var[X] = Var[E[X]] + E[Var[X]]
    se = np.sqrt(v/Nsim)     
    ci = mu + se * np.array([-1.959964, 1.959964])
    mean_outcomes = results[:,2:].mean(axis = 0)   # history_positive, events among history_negative, events among history_positive  
    res = np.hstack([mu, ci, np.exp(mu), np.exp(ci), mean_outcomes]).round(4)
    return  *res, N

In [7]:
def simulate(lambda_0, theta_0, delta, rate, ln_r, invG, start, follow, N, d, Nsim):
    res = list(map(lambda x: sim_cohort(lambda_0, theta_0, rate, ln_r, start, follow, N, d, invG), range(Nsim)))
    res = calc_summary(np.array(res), N, Nsim)
    return  res

In [8]:
def execute(r0, theta_0, delta, rate, ln_r, invG, start, follow, N, d, Nsim):
    lambda_0 = calc_lambda_0(r0, theta_0, delta, rate, ln_r, invG = invG)
    res = list(map(lambda lam, th, de, n: simulate(lam, th, de, rate, ln_r, invG, start, follow, n, d, Nsim), lambda_0, theta_0, delta, N) )
    return res

### Cohort setting

In [9]:
end = 5
start = 2.5
follow = end - start
r0 = np.arange(2,7,2)*0.01

### Simulation setting

In [10]:
Nsim = 20
d = 100

### Parameters

In [11]:
invG = False      # for Gamma
theta_0 = [0.70, 0.73, 0.76]   
delta = theta_0
RR = 2.5
rate = 1.5**0.1
ln_r = np.log(rate)

In [None]:
invG = True      # for inv-Gaussian
theta_0 = [0.28, 0.26, 0.25]
delta = np.sqrt(theta_0)
RR = 2.75
rate = 1.5**0.1
ln_r = np.log(rate)

## Simulate

In [12]:
N = cohort_size(RR, None, rate, r0, start, end)
print(N)
res = execute(r0, theta_0, delta, rate, ln_r, invG, start, follow, N, d, Nsim)
res = pd.DataFrame(res)
res.columns = ["beta", "beta2.5", "beta97.5", "HR", "HR2.5", "HR97.5","history", "event0", "event1", "N"]
res.index = r0

[30000  8000  4000]


In [13]:
res

Unnamed: 0,beta,beta2.5,beta97.5,HR,HR2.5,HR97.5,history,event0,event1,N
0.02,0.9213,0.885,0.9576,2.5126,2.4229,2.6055,0.0499,0.0509,0.1235,30000
0.04,0.9175,0.8852,0.9498,2.5031,2.4235,2.5853,0.0952,0.093,0.2167,8000
0.06,0.944,0.9053,0.9827,2.5703,2.4728,2.6716,0.1404,0.1274,0.2958,4000
