In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas
import cmdstanpy
from cmdstanpy import CmdStanModel
import pandas
import scipy.stats
import scipy.integrate
import random

Read in Data and plot each time series.

In [None]:
df = pandas.read_excel('23_05_22 tet variable EML.xlsx', skiprows=7, sheet_name='Virus over cell area')

t = df['Elapsed'].values

data = {}

percentages = [0, 20, 40, 60, 80, 100]

for percentage in percentages:
    x = df['SINV mCherry 10 MOI,{}% W(+) (1) 500K / well'.format(percentage)].values
    xerr = df['SINV mCherry 10 MOI,{}% W(+) (1) 500K / well (Std Err Well)'.format(percentage)].values
    data[percentage] = x

fig = plt.figure(figsize=(10.5, 4))
N = len(percentages)

for i, percentage in enumerate(percentages):
    ax = fig.add_subplot(2, N, i+1)
    x = data[percentage]

    ax.plot(t, x, color='k', zorder=-1)
    p1 = ax.scatter(t, x, color='k', s=25)
    p2 = ax.scatter(t, x, color='white', s=5)

    ax.set_title('{}% W(+)'.format(percentage))

    if i == N-1:
        ax.legend(((p1, p2),), ['Data',])

    if i == 0:
        ax.set_ylabel('Virus over cell area')

    ax = fig.add_subplot(2, N, i+1+N)

    p1, = ax.plot(t, x, color='k')
    p2 = ax.fill_between(t, x-xerr, x+xerr, alpha=0.25, color='k')
    
    ax.set_xlabel('Time (hours)')

    if i == N-1:
        ax.legend(((p1, p2),), [r'Data $\pm$ Std Err',])

    if i == 0:
        ax.set_ylabel('Virus over cell area')


fig.set_tight_layout(True)

plt.show()


Run prior predictive checks.

In [None]:
# Prior predictive

num_prior_samples = 10000

all_solns_compound = []

for _ in range(num_prior_samples):
    theta = scipy.stats.truncnorm.rvs(0, np.inf, loc=0, scale=1, size=8)
    area_ratio = random.uniform(0.8, 1.0)
    V_0 = scipy.stats.truncnorm.rvs(0.01, 10000, loc=0, scale=0.1)
    colonized_prop = random.uniform(0.1, 1.0)
    
    full_soln = []
    for i in range(6):

        y0 = [
            (1.0 - colonized_prop * 0.2 * i) * area_ratio,
            0.0,
            V_0,
            0.0,
            0.0,
        ]

        def f(t, y):
            T = y[0]
            I = y[1]
            V = y[2]
            P = y[3]
            C = y[4]

            dydt = [0, 0, 0, 0, 0]

            dydt[0] = -theta[0] * T * V - theta[3] * C * T
            dydt[1] = theta[0] * T * V - theta[3] * C * I
            dydt[2] = theta[1] * I - theta[2] * C * V
            dydt[3] = theta[3] * C * T + theta[3] * C * I
            dydt[4] = theta[4] * 0.2 * i - theta[5] * C * T - theta[5] * C * I - theta[6] * C - theta[7] * C * V

            return dydt

        sol = scipy.integrate.solve_ivp(
            f,
            (0, 148),
            y0,
            t_eval=t,
        ).y

        full_soln.append(sol)

    all_solns_compound.append(full_soln)

all_solns_compound = np.asarray(all_solns_compound)


# Prior predictive Extended Model
all_solns_extended = []

for _ in range(num_prior_samples):

    theta = scipy.stats.truncnorm.rvs(0, np.inf, loc=0, scale=1, size=10)
    area_ratio = random.uniform(0.8, 1.0)
    V_0 = scipy.stats.truncnorm.rvs(0.01, 10000, loc=0, scale=0.1)
    colonized_prop = random.uniform(0.1, 1.0)

    full_soln = []
    for i in range(6):

        y0 = [
            (1.0 - colonized_prop * 0.2 * i) * area_ratio,
            0.0,
            V_0,
            0.0,
        ]

        def f(t, y):
            T = y[0]
            I = y[1]
            V = y[2]
            E = y[3]

            dydt = [0, 0, 0, 0,]

            beta = theta[0]
            p = theta[1]
            L = theta[2]
            e = theta[3]
            s = theta[4]
            A1 = theta[5]
            A2 = theta[6]
            l1 = theta[7]
            l2 = theta[8]
            tstar = theta[9]

            dydt[0] = -beta * T * V / (L + V)
            dydt[1] = e * E

            if t <= tstar:
                F = A1 * np.exp(l1 * t)
            else:
                F = A2 * np.exp(-l2 * t)
            dydt[2] = p * I / (1.0 + s * F)

            dydt[3] = beta * T * V / (L + V) - e * E

            return dydt

        sol = scipy.integrate.solve_ivp(
            f,
            (0, 148),
            y0,
            t_eval=t,
        ).y

        full_soln.append(sol)

    all_solns_extended.append(full_soln)

all_solns_extended = np.asarray(all_solns_extended)


# Prior predictive Basic Model
all_solns_basic = []

for _ in range(num_prior_samples):

    theta = scipy.stats.truncnorm.rvs(0, np.inf, loc=0, scale=1, size=2)
    area_ratio = random.uniform(0.8, 1.0)
    V_0 = scipy.stats.truncnorm.rvs(0.01, 10000, loc=0, scale=0.1)
    colonized_prop = random.uniform(0.1, 1.0)

    full_soln = []
    for i in range(6):

        y0 = [
            (1.0 - colonized_prop * 0.2 * i) * area_ratio,
            0.0,
            V_0,
        ]

        def f(t, y):
            T = y[0]
            I = y[1]
            V = y[2]

            dydt = [0, 0, 0,]

            beta = theta[0]
            p = theta[1]

            dydt[0] = -beta * T * V
            dydt[1] = beta * T * V
            dydt[2] = p * I
            return dydt

        sol = scipy.integrate.solve_ivp(
            f,
            (0, 148),
            y0,
            t_eval=t,
        ).y

        full_soln.append(sol)

    all_solns_basic.append(full_soln)

all_solns_basic = np.asarray(all_solns_basic)

Plot prior predictive results.

In [None]:
fig = plt.figure(figsize=(12, 6))

i = 1
style = 0
ax = None

model_names = ['Compound Model', 'Basic Comparator', 'Extended Comparator']

for model in [1, 2, 3]:

    if model == 1:
        all_solns = all_solns_compound
    elif model == 2:
        all_solns = all_solns_basic
    elif model == 3:
        all_solns = all_solns_extended

    for j in range(6):
        ax = fig.add_subplot(3, 6, j + 1 + 6 * (model - 1), sharey=ax, sharex=ax)

        if style == 0:
            mid = np.median(all_solns[:, j, i, :], axis=0)
            upper = np.percentile(all_solns[:, j, i, :], 99.5, axis=0)
            lower = np.percentile(all_solns[:, j, i, :], 0.5, axis=0)
            l1, = ax.plot(t, 100 * mid)
            l2 = ax.fill_between(t, 100 * lower, 100 * upper, alpha=0.25)
    
        elif style == 1:
            for traj in all_solns[::50, j, i, :]:
                l1, = ax.plot(t, 100 * traj, alpha=0.2, color='tab:blue')
                l2 = l1

        if model == 3:
            ax.set_xlabel('Time (hours)')
        if j == 0:
            ax.set_ylabel('I\n({})'.format(model_names[model-1]))

        if model == 1:
            ax.set_title('{}% W'.format(j * 20))

        x = data[20 * j]
        l3, = ax.plot(t, x, color='k', zorder=100000, lw=2, ls='--', label='Data')

        if model == 1 and j == 5:
            ax.legend(((l1, l2,), l3,), ['Prior (99%)', 'Data'], loc='upper right')


fig.set_tight_layout(True)

plt.savefig('prior_all_models.pdf')

plt.show()

Run inference

In [None]:
model = CmdStanModel(stan_file='model.stan', cpp_options={'STAN_THREADS':'true'})

In [None]:
def sample_prior(n_theta, n_sigma):
    """Generate a sample from the prior.
    
    Parameters
    ----------
    n_theta : int
        Dimension of theta parameter vector
    n_sigma : int
        Dimension of sigma noise parameter vector

    Returns
    -------
    dict
        Parameter values sampled from the prior distribution
    """
    theta = []
    area_ratio = []
    V_0 = []
    sigma = []

    for _ in range(1):
        theta.append(scipy.stats.truncnorm.rvs(0, 1000, loc=0, scale=1, size=n_theta))
        area_ratio.append(scipy.stats.uniform.rvs(loc=0.8, scale=0.2))
        V_0.append(scipy.stats.uniform.rvs(loc=0.02, scale=0.08, size=1))
        sigma.append(scipy.stats.truncnorm.rvs(0, 1000, loc=0, scale=0.01, size=n_sigma))

    return {'theta': theta[0], 'area_ratio': area_ratio[0], 'V_0': V_0[0][0], 'sigma': sigma[0],}

In [None]:
model_types = [1, 2, 3]
num_repeats = 20

results = {}

for model_type in model_types:

    stan_data = {
        'T': len(t),
        'N': 6,
        'v': np.asarray([data[0], data[20], data[40], data[60], data[80], data[100],]).T / 100.0,
        'ts': t,
        'model_type': model_type,
        'use_priors': 1,
    }

    cmdstanpy.write_stan_json('data.json', stan_data)

    best_fit = None
    best_value = -np.inf

    for _ in range(num_repeats):
        try:
            fit = model.optimize(data='data.json', inits=sample_prior(8 if model_type == 1 else 10 if model_type == 3 else 2, stan_data['N']))
            lp = fit.optimized_params_pd['lp__'].values[0]
        except RuntimeError as e:
            print(e)
            lp = -np.inf
        if lp > best_value:
            best_fit = fit
            best_value = lp

    results[model_type] = [best_fit, best_value]

In [None]:
model_types = [1, 2, 3]
num_repeats = 20

results_mle = {}

for model_type in model_types:

    stan_data = {
        'T': len(t),
        'N': 6,
        'v': np.asarray([data[0], data[20], data[40], data[60], data[80], data[100],]).T / 100.0,
        'ts': t,
        'model_type': model_type,
        'use_priors': 0,
    }

    cmdstanpy.write_stan_json('data.json', stan_data)

    best_fit = None
    best_value = -np.inf

    for _ in range(num_repeats):
        try:
            init = {'theta': results[model_type][0].theta, 'colonized_prop': results[model_type][0].colonized_prop, 'area_ratio': results[model_type][0].area_ratio, 'V_0': results[model_type][0].V_0, 'sigma': results[model_type][0].sigma}
            fit = model.optimize(data='data.json', inits=init)
            lp = fit.optimized_params_pd['lp__'].values[0]
        except RuntimeError as e:
            print(e)
            lp = -np.inf
        if lp > best_value:
            best_fit = fit
            best_value = lp

    results_mle[model_type] = [best_fit, best_value]

In [None]:
def MAE(fit, data):
    abs_errors = []
    for i in range(5):
        y = fit.simulated_y[:, 1, i]
        abs_errors += list(100.0 * np.abs(y - data.T[i]))
    return np.mean(abs_errors)

model_num_params = {1: 17, 2: 11, 3: 19}

In [None]:
# Compute metrics of comparison
models = [1, 2, 3]

model_names = ['Protective Compound', 'Basic Comparator', 'Extended Comparator']
mlp = [results[model_type][1] for model_type in models]
mll = [results_mle[model_type][0].log_likelihood for model_type in models]
aic = [2 * model_num_params[model_type] - 2 * results_mle[model_type][0].log_likelihood for model_type in models]
mae = [MAE(results[model_type][0], stan_data['v']) for model_type in models]


df = pandas.DataFrame(
    {
     'Model': model_names,
     'Maximum Log Posterior': mlp,
      'Mean Absolute Error': mae,
     'Maximum Log Likelihood': mll,
     'AIC': aic,
    }
    )

df.to_excel('comparison_statistics.xlsx', index=False, float_format="%.3f")

In [None]:
lss = ['o-', 'd-', 'v-', '^-', 'h-']
lsss = ['-', '-.', '--', ':', '-']
colors = [(252, 60, 69), (203, 112, 25), (141, 148, 25), (81, 168, 71), (25, 177, 134), (23, 176, 192)]
colors = [[x/255 for x in y] for y in colors]

labels = []
data_labels = []
names = []

T_ax = None

datas = [0, 20, 40, 60, 80]

fig = plt.figure(figsize=(8, 3.5))

ax = fig.add_subplot(1, 2, 1)

for _i, i in enumerate([1, 2, 3, 4, 5]):

    x = data[datas[_i]]
    p2 = ax.scatter(t, x, s=6, marker=lss[i-1][0], color=colors[i-1])

    fit = results[1][0]
    p3, = ax.plot(t, 100 * fit.simulated_y[:, 1, _i], zorder=5, lw=2, color=colors[_i])

    labels.append((p3,))
    names.append('{}% W(+)'.format(datas[_i]))
    data_labels.append((p2,))


ax.set_xlabel('Time (Hours)')
ax.set_ylabel('Virus over cell area (%)')

s0 = ax.scatter([1, ], [110,], marker='$/$', color='k', label='Data', s=100)
ax.set_ylim(None, 95)

ax.legend(data_labels + [s0,] * 5 + labels, [''] * 10 + names, title='Data / Fit                  ', loc='center left', bbox_to_anchor=(1, 0.75), ncol=3, handlelength=1.5, borderpad=0.7, handletextpad=1.25, columnspacing=-1.5)

fig.set_tight_layout(True)

plt.show()

In [None]:
df_fits = pandas.DataFrame(
    {
        'Time': t,
        '0% W': 100 * results[1][0].simulated_y[:, 1, 0],
        '20% W': 100 * results[1][0].simulated_y[:, 1, 1],
        '40% W': 100 * results[1][0].simulated_y[:, 1, 2],
        '60% W': 100 * results[1][0].simulated_y[:, 1, 3],
        '80% W': 100 * results[1][0].simulated_y[:, 1, 4],
        '100% W': 100 * results[1][0].simulated_y[:, 1, 5],
    }
)
df_fits.to_excel('fit_compound.xlsx', index=False, float_format="%.3f")


df_fits = pandas.DataFrame(
    {
        'Time': t,
        '0% W': 100 * results[2][0].simulated_y[:, 1, 0],
        '20% W': 100 * results[2][0].simulated_y[:, 1, 1],
        '40% W': 100 * results[2][0].simulated_y[:, 1, 2],
        '60% W': 100 * results[2][0].simulated_y[:, 1, 3],
        '80% W': 100 * results[2][0].simulated_y[:, 1, 4],
        '100% W': 100 * results[2][0].simulated_y[:, 1, 5],
    }
)
df_fits.to_excel('fit_basic_comparator.xlsx', index=False, float_format="%.3f")


df_fits = pandas.DataFrame(
    {
        'Time': t,
        '0% W': 100 * results[3][0].simulated_y[:, 1, 0],
        '20% W': 100 * results[3][0].simulated_y[:, 1, 1],
        '40% W': 100 * results[3][0].simulated_y[:, 1, 2],
        '60% W': 100 * results[3][0].simulated_y[:, 1, 3],
        '80% W': 100 * results[3][0].simulated_y[:, 1, 4],
        '100% W': 100 * results[3][0].simulated_y[:, 1, 5],
    }
)
df_fits.to_excel('fit_extended_comparator.xlsx', index=False, float_format="%.3f")