In [13]:
import os
import csv
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
from scipy import stats
from scipy.stats import linregress
from scipy.interpolate import interp1d
from scipy.integrate import solve_ivp
from joblib import Parallel, delayed
from multiprocessing import Pool

# **I. Data Preparation**

In [14]:
# a function to read excel raw data
def extract_data(df, df_skipped, name):
    idx = df.columns.get_loc(f'% {name}')
    cols = [df.columns[idx], df.columns[idx + 1]]
    sub_df = df_skipped[cols]
    sub_df = sub_df[
        ~(
            sub_df.apply(lambda row: row.map(lambda x: isinstance(x, str)) | row.isna(), axis=1)
        ).any(axis=1)
    ]
    return sub_df

In [18]:
# reading excel raw data, and save each experiment as name_df (e.g. control1_df)

df = pd.read_excel('/path/to/psMEK_raw_scaled.xlsx', engine='openpyxl')  # change to the path of your raw data spreadsheet
df_skipped = df.iloc[2:]

names = ['control1', 'continuous', 'control2', 'A1', 'A2', 'control3', 'B1', 'B2', 'B3', 'control4', 'C1', 'C2', 'control5', 'D1', 'D2', 'D3']
for name in names:
    globals()[f'{name}_df'] = extract_data(df, df_skipped, name)

In [19]:
# raw data have pupation times in days, now convert them to hours (hAEL)
# save each experiment as name_expanded (e.g. control1_expanded)

for name in names:
    df_var = globals()[f'{name}_df']
    expanded = np.repeat(
        df_var.iloc[:, 0].astype(float).values,
        df_var.iloc[:, 1].astype(int).values
    ) * 24
    globals()[f'{name}_expand'] = expanded

In [20]:
# for each treatment (perturbation), we have a paired control: control 1,2,3,4,5
# calculate the scaling factor for each treatment (the factor that scales the mean of control group to the standard 130 hours)
# scaling factor = 130 hours / control_mean

standard_ctrl = 130
control_names = [f'control{i}' for i in range(1, 6)]

for name in control_names:
    expand_var = globals()[f'{name}_expand']
    mean_val = np.mean(expand_var)
    globals()[f'{name}_mean'] = mean_val
    globals()[f'{name}_factor'] = standard_ctrl / mean_val

In [21]:
# Assume that L3 starts around 70 hAEL for a control-group larva that experiences a 130-hour total development (egg to pupation)
# Same for all treatment groups. L3 starts around 70 hAEL because our optogenetic tool only affects L3.

# 1. normalization of raw data (astronomical times --> developmental times)
# 2. convert hAEL (hours after egg lay) to AL3E (hours spent in L3)
continuous_L3 = continuous_expand*control1_factor - 70
A1_L3 = A1_expand*control2_factor - 70
A2_L3 = A2_expand*control2_factor - 70
B1_L3 = B1_expand*control3_factor - 70
B2_L3 = B2_expand*control3_factor - 70
B3_L3 = B3_expand*control3_factor - 70
C1_L3 = C1_expand*control4_factor - 70
C2_L3 = C2_expand*control4_factor - 70
D1_L3 = D1_expand*control5_factor - 70
D2_L3 = D2_expand*control5_factor - 70
D3_L3 = D3_expand*control5_factor - 70

# mean pupation times, hAL3E
continuous_mean = np.mean(continuous_L3)
A1_mean = np.mean(A1_L3)
A2_mean = np.mean(A2_L3)
B1_mean = np.mean(B1_L3)
B2_mean = np.mean(B2_L3)
B3_mean = np.mean(B3_L3)
C1_mean = np.mean(C1_L3)
C2_mean = np.mean(C2_L3)
D1_mean = np.mean(D1_L3)
D2_mean = np.mean(D2_L3)
D3_mean = np.mean(D3_L3)

# We will only use A1-B3 for fitting (optimization).
# Calculate hours of advancement (hAL3E) for these groups
diff_means = [60-continuous_mean, 60-A2_mean, 60-A1_mean, 60-B3_mean, 60-B2_mean, 60-B1_mean]

In [23]:
# normalization of light on/off times (astronomical times --> developmental times)
continuous_start = 58 * control1_factor
continuous_end = 130
A1_start = 58 * control2_factor
A1_end = 88 * control2_factor
A2_start = 88 * control2_factor
A2_end = 118 * control2_factor
B1_start = 70 * control3_factor
B1_end = 85 * control3_factor
B2_start = 85 * control3_factor
B2_end = 100 * control3_factor
B3_start = 100 * control3_factor
B3_end = 115 * control3_factor
C1_start = 75 * control4_factor
C1_end = 107 * control4_factor
C2_start = 91 * control4_factor
C2_end = 123 * control4_factor
D1_start = 74.5 * control5_factor
D1_end = 90.5 * control5_factor
D2_start = 90.5 * control5_factor
D2_end = 106.5 * control5_factor
D3_start = 106.5 * control5_factor
D3_end = 122.5 * control5_factor

# Then convert light on/off developmental times from hAEL to hAL3E
continuous_start_L3 = 0
continuous_end_L3 = continuous_end - 70
A1_start_L3 = 0
A1_end_L3 = A1_end - 70
A2_start_L3 = A2_start - 70
A2_end_L3 = A2_end - 70
B1_start_L3 = 0
B1_end_L3 = B1_end - 70
B2_start_L3 = B2_start - 70
B2_end_L3 = B2_end - 70
B3_start_L3 = B3_start - 70
B3_end_L3 = B3_end - 70
C1_start_L3 = C1_start - 70
C1_end_L3 = C1_end - 70
C2_start_L3 = C2_start - 70
C2_end_L3 = C2_end - 70
D1_start_L3 = D1_start - 70
D1_end_L3 = D1_end - 70
D2_start_L3 = D2_start - 70
D2_end_L3 = D2_end - 70
D3_start_L3 = D3_start - 70
D3_end_L3 = D3_end - 70

# **II. Parameter Inference**
The following blocks are dedicated to parameter inference (optimization) and bootstrapping. They calculate the optimal parameter set and the 95% confidence interval for the prediction data (Figure 3-4).  
The blocks will save CSV files for each grid search. In other words, the optimal parameter set for each bootstrapped dataset will be saved, so once the process is complete, you wonâ€™t need to run it again.

In [None]:
''' Functions '''

# a function of bootstrapping the control & treatment groups together
def bootstrap_delta_distributions(raw_pert_arrays, raw_ctrl_arrays, num_bootstrap=9999):
    
    Delta_dist = [[] for _ in range(len(raw_pert_arrays))]
    Delta_stdv = []
    
    for _ in range(num_bootstrap):
        
        resampled_ctrl1 = np.random.choice(raw_ctrl_arrays[0], size=len(raw_ctrl_arrays[0]), replace=True)
        resampled_ctrl2 = np.random.choice(raw_ctrl_arrays[1], size=len(raw_ctrl_arrays[1]), replace=True)
        resampled_ctrl3 = np.random.choice(raw_ctrl_arrays[2], size=len(raw_ctrl_arrays[2]), replace=True)
        
        resampled_continuous = np.random.choice(raw_pert_arrays[0], size=len(raw_pert_arrays[0]), replace=True)
        resampled_A2 = np.random.choice(raw_pert_arrays[1], size=len(raw_pert_arrays[1]), replace=True)
        resampled_A1 = np.random.choice(raw_pert_arrays[2], size=len(raw_pert_arrays[2]), replace=True)
        resampled_B3 = np.random.choice(raw_pert_arrays[3], size=len(raw_pert_arrays[3]), replace=True)
        resampled_B2 = np.random.choice(raw_pert_arrays[4], size=len(raw_pert_arrays[4]), replace=True)
        resampled_B1 = np.random.choice(raw_pert_arrays[5], size=len(raw_pert_arrays[5]), replace=True)

        
        Delta_continuous = (np.mean(resampled_ctrl1) - np.mean(resampled_continuous)) * (130 / np.mean(resampled_ctrl1))
        Delta_A2 = (np.mean(resampled_ctrl2) - np.mean(resampled_A2)) * (130 / np.mean(resampled_ctrl2))
        Delta_A1 = (np.mean(resampled_ctrl2) - np.mean(resampled_A1)) * (130 / np.mean(resampled_ctrl2))
        Delta_B3 = (np.mean(resampled_ctrl3) - np.mean(resampled_B3)) * (130 / np.mean(resampled_ctrl3))
        Delta_B2 = (np.mean(resampled_ctrl3) - np.mean(resampled_B2)) * (130 / np.mean(resampled_ctrl3))
        Delta_B1 = (np.mean(resampled_ctrl3) - np.mean(resampled_B1)) * (130 / np.mean(resampled_ctrl3))
                
        Delta_dist[0].append(Delta_continuous)
        Delta_dist[1].append(Delta_A2)
        Delta_dist[2].append(Delta_A1)
        Delta_dist[3].append(Delta_B3)
        Delta_dist[4].append(Delta_B2)
        Delta_dist[5].append(Delta_B1)
    
    for delta in Delta_dist:
        Delta_stdv.append(np.std(delta))
    
    return Delta_dist, Delta_stdv


# pulse function - extra signaling from the optogenetic tool
def pulse(x, A, t0, t1):
    return np.where((t0 <= x) & (x <= t1), A, 0)


# the function of our ODE
def odes(t, i, Amp, ti, tf, beta, x_stop):
    x, y = i
    F = 1 if x <= x_stop else 0
    dxdt = (1/2) * x**2 + (1/2) * y - x + pulse(t, Amp, ti, tf)
    dydt = beta * F
    return [dxdt, dydt]


# a function of solving for the blow-up time of one treatment
def solve_scenario(args, i0, beta, x_stop, t_span, target_x, ctrl_time, expand, stdv):
    solution = solve_ivp(odes, t_span, i0, args=(args[0], args[1], args[2], beta, x_stop), 
                         t_eval=np.linspace(0, t_end, 1000), max_step=0.01, method='RK45')
    x = solution.y[0]
    t = solution.t

    if np.any(x >= target_x):
        T_asymp = t[np.isfinite(x)].max() * 60 / ctrl_time
        return np.sum((expand - (60-T_asymp)) ** 2 / stdv**2) # taking the difference between prediction & experimental advancement in pupation times
    else:
        return np.inf # if the curve never blows up, define the blow-up time as infinity


# a function of computing the total cost function of all 6 treatments
def cost_function(params, data_times, data_stdv):
    beta = params["beta"]
    alpha = params["alpha"]
    x_stop = params["x_stop"]

    delta = 0
    x0 = 1 - np.sqrt(1 - delta)
    i0 = [x0, delta]

    # solve the ODE for the control scenario to get control blow-up time
    solution_ctrl = solve_ivp(odes, t_span, i0, args=(0, 0, 1, beta, x_stop), 
                              t_eval=np.linspace(0, t_end, 1000), max_step=0.01, method='RK45')
    ctrl_x = solution_ctrl.y[0]
    ctrl_t = solution_ctrl.t

    if np.any(ctrl_x >= target_x):
        ctrl_time = ctrl_t[np.isfinite(ctrl_x)].max()
    else:
        return np.inf

    # use control blow-up time to find the normalized light on/off time (hAL3E) for each treatment
    scenarios = [
        (alpha, 0, ctrl_time),
        (alpha, ctrl_time * A2_start_L3 / 60, ctrl_time * A2_end_L3 / 60),
        (alpha, 0, ctrl_time * A1_end_L3 / 60),
        (alpha, ctrl_time * B3_start_L3 / 60, ctrl_time * B3_end_L3 / 60),
        (alpha, ctrl_time * B2_start_L3 / 60, ctrl_time * B2_end_L3 / 60),
        (alpha, 0, ctrl_time * B1_end_L3 / 60)
    ]

    # find the total cost of all 6 treatments
    total_cost = []
    for args, expand, stdv in zip(scenarios, data_times, data_stdv):
        cost = solve_scenario(args, i0, beta, x_stop, t_span, target_x, ctrl_time, expand, stdv)
        total_cost.append(cost)

    return np.sum(total_cost)


# a function that returns the total cost associated with a given set of parameters
def compute_cost(beta, alpha, x_stop, data_times, data_stdv):
    cost = cost_function({'beta': beta, 'alpha': alpha, 'x_stop': x_stop}, data_times, data_stdv)
    return beta, alpha, x_stop, cost


# a funtion that saves all parameter values and their associated cost function into a CSV file
def save_result(result, file_directory):
    beta, alpha, x_stop, cost = result
    with open(file_directory, mode='a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow([beta, alpha, x_stop, cost])
    i = np.where(beta_vals == beta)[0][0]
    j = np.where(alpha_vals == alpha)[0][0]
    k = np.where(x_stop_vals == x_stop)[0][0]
    cost_matrix[i, j, k] = cost

In [None]:
# Bootstrap the control & treatment groups together
Raw_pert_arrays = [continuous_expand, A2_expand, A1_expand, B3_expand, B2_expand, B1_expand]
Raw_ctrl_arrays = [control1_expand, control2_expand, control3_expand]

Delta_dist, Delta_stdv = bootstrap_delta_distributions(Raw_pert_arrays, Raw_ctrl_arrays)


# start and end simulation times
t_end = 10
t_span = (0, t_end)

# if the value of x never reaches this target threshold, we will call it "never pupated" (i.e. never reach infinity; will not blow up)
target_x = 30


# Parameter space that we are interested in
# I have tried np.linspace(0, 5, 500) for all three parameters, but it was overkill in our case
beta_vals = np.linspace(0.5, 1.2, 70)
alpha_vals = np.linspace(0.5, 1.2, 70)
x_stop_vals = np.linspace(1.3, 1.7, 30)
cost_matrix = np.zeros((len(beta_vals), len(alpha_vals), len(x_stop_vals)))

# create a list of all parameter combinations
param_combinations = [(beta, alpha, x_stop) for beta in beta_vals for alpha in alpha_vals for x_stop in x_stop_vals]

In [36]:
# Bootstrap the original data & do parameter optimization on the bootstrapped data

boostrapped_index =  # we boostrapped 100 times in our case, so the number went from 1 to 100 (i.e. we repeated this block 100 times with different boostrapped indexes)

data_times_bootstrapped = [Delta_dist[i][boostrapped_index] for i in range(6)]
results_bootstrapped = Parallel(n_jobs=10, batch_size='auto')(delayed(compute_cost)(beta, alpha, x_stop, data_times_bootstrapped, Delta_stdv) for beta, alpha, x_stop in tqdm(param_combinations, desc="Computing Costs"))

bootstrapped_file_directory = f'/path/to/save/cost_matrix_Bootstrap{boostrapped_index}.csv' # change to your path

# write an empty row at the beginning
with open(bootstrapped_file_directory, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['', '', '', ''])

# save each result incrementally
for result in results_bootstrapped:
    save_result(result, bootstrapped_file_directory)

# **III. Data Analysis**

After obtaining the optimal parameter set, we can use it to make predictions and compare them with the experimental results.

In [None]:
''' functions '''

# pulse function - extra signaling from the optogenetic tool
def pulse(x, A, t0, t1):
    return np.where((t0 <= x) & (x <= t1), A, 0)


# the function of our ODE
def odes(t, i, Amp, ti, tf, beta, x_stop):
    x, y = i
    F = 1 if x <= x_stop else 0
    dxdt = (1/2) * x**2 + (1/2) * y - x + pulse(t, Amp, ti, tf)
    dydt = beta * F
    return [dxdt, dydt]


# a function that calculates the 95% confidence interval for a specific duration of light exposure, and saves a CSV file
def compute_95CI(t0, beta, alpha, x_stop, odes, t_end, duration, target_x):

    y0 = 0
    x0 = 1 - np.sqrt(1 - y0)
    i0 = [x0, y0]

    solution0 = solve_ivp(
        odes, (0, t_end), i0,
        args=(0, 0, 1e-10, beta, x_stop),
        t_eval=np.linspace(0, t_end, 1000),
        max_step=0.01
    )
    
    E0 = solution0.y[0]
    if np.any(E0 >= target_x):
        target_t_0 = solution0.t[np.isfinite(E0)][np.argmax(E0 >= target_x)]
    else:
        return np.nan  # skip further calculation if target_t_0 is not found

    standard_t_0 = 4.738738738738738  # number you calculated beforehand, which is the target_t_0 by using {optimized_beta, optimized_alpha, optimized_x_stop}
    normalized_t0 = t0*target_t_0/standard_t_0

    normalized_t1 = normalized_t0 + duration*target_t_0/60
    if normalized_t1 >= target_t_0:
        return np.nan

    solution = solve_ivp(
        odes, (0, t_end), i0,
        args=(alpha, normalized_t0, normalized_t1, beta, x_stop),
        t_eval=np.linspace(0, t_end, 1000),
        max_step=0.01
    )
    x = solution.y[0]
    if np.any(x >= target_x):
        target_t = solution.t[np.isfinite(x)][np.argmax(x >= target_x)]
        return (target_t_0 - target_t) * 60 / target_t_0
    else:
        return np.nan  # return NaN if the target isn't reached

In [None]:
# read CSV files obtained from bootstrapping & optimization (code section 2)
# find the optimal parameter set at the global minimum for each bootstrapped data

min_beta = []
min_alpha = []
min_x_stop = []

for i in range(1,101):
    file = f'/path/to/read/cost_matrix_Bootstrap{i}.csv'
    if os.path.exists(file):
        data = np.genfromtxt(file, delimiter=',', skip_header=1)
        
        min_cost_index = np.argmin(np.abs(data[:, 3]))
        min_cost_row = data[min_cost_index]

        min_beta.append(min_cost_row[0])
        min_alpha.append(min_cost_row[1])
        min_x_stop.append(min_cost_row[2])

In [None]:
# initial setup

# calculate the average of optimal parameter sets, and use the average to make predictions
optimized_beta = np.mean(min_beta)
optimized_alpha = np.mean(min_alpha)
optimized_x_stop = np.mean(x_stop)
optimized_params = optimized_beta, optimized_alpha, optimized_x_stop


# initial conditions
y0 = 0
x0 = 1 - np.sqrt(1 - y0)
i0 = [x0, y0]

# start and end simulation times
t_end = 6
t_span = (0, t_end) 

target_x = 30

# find the pupation time of the control (in dimensionless time)
solution0 = solve_ivp(odes, t_span, i0, args=(0, 0, 0+1e-10, optimized_beta, optimized_x_stop), t_eval=np.linspace(0, t_end, 1000), max_step=0.01)
E0 = solution0.y[0]
if np.any(E0 >= target_x):
    target_t_0 = solution0.t[np.isfinite(E0)].max()
else:
    target_t_0 = np.inf

In [None]:
# generate predictions (e.g. Figure 3B)

scenarios = [
    (optimized_alpha, 0, target_t_0, optimized_beta, optimized_x_stop),
    (optimized_alpha, target_t_0*A2_start_L3/60, target_t_0*A2_end_L3/60, optimized_beta, optimized_x_stop),
    (optimized_alpha, 0, target_t_0*A1_end_L3/60, optimized_beta, optimized_x_stop),
    (optimized_alpha, target_t_0*B3_start_L3 /60, target_t_0*B3_end_L3/60, optimized_beta, optimized_x_stop),
    (optimized_alpha, target_t_0*B2_start_L3/60, target_t_0*B2_end_L3/60, optimized_beta, optimized_x_stop),
    (optimized_alpha, 0, target_t_0*B1_end_L3/60, optimized_beta, optimized_x_stop),
    (0, 0, 1, optimized_beta, optimized_x_stop),
    (optimized_alpha, target_t_0*C1_start_L3/60, target_t_0*C1_end_L3/60, optimized_beta, optimized_x_stop),
    (optimized_alpha, target_t_0*C2_start_L3/60, target_t_0*C2_end_L3/60, optimized_beta, optimized_x_stop),
    (optimized_alpha, target_t_0*D1_start_L3/60, target_t_0*D1_end_L3/60, optimized_beta, optimized_x_stop),
    (optimized_alpha, target_t_0*D2_start_L3/60, target_t_0*D2_end_L3/60, optimized_beta, optimized_x_stop),
    (optimized_alpha, target_t_0*D3_start_L3/60, target_t_0*D3_end_L3/60, optimized_beta, optimized_x_stop)
]


results = []
for args in scenarios:
    solution = solve_ivp(odes, t_span, i0, args=args, t_eval=np.linspace(0, t_end, 1000), max_step=0.01)
    results.append((solution.t, solution.y[0]))

prediction = []
for i, (t, x) in enumerate(results):
    if np.any(x >= target_x):
        target_t = t[np.isfinite(x)].max()
        prediction.append((target_t_0-target_t)*60/target_t_0)
        print(f"Scenario {i}: The value of t when x = {target_x} is approximately {target_t*60/target_t_0:.4f} hours, which is {(target_t_0-target_t)*60/target_t_0:.4f} hours of advancement.")
    else:
        print(f"Scenario {i}: x does not reach the value of {target_x} within the given time span.")


labels = ["continuous", "A2", "A1", "B3", "B2", "B1", "Control", "C1", "C2", "D1", "D2", "D3"]
colors = [ ] # choose your own colors!

plt.figure(figsize=(8, 6))

for (t, x), label, color in zip(results, labels, colors):
    plt.plot(t*60/target_t_0, x, label=label, color=color, marker='', linestyle='-')


plt.xlabel('t', fontsize=12)
plt.ylabel('x', fontsize=12)
plt.xlim(left=0)
plt.ylim(bottom=0, top=target_x+2)
plt.tick_params(axis='both', which='major', labelsize=10)
plt.legend()
plt.show()

In [None]:
# Generate error bar (95% confidence interval) for predictions (e.g. Figure 3C)
# This is done by using all optimal parameter sets (from bootstrapping in code section 2) to generate predictions, find the advancement in pupation times in each treatment, and then find 2.75 and 97.5 percentile among all advancements 

advancement_distributions = []

for beta, alpha, x_stop in zip(min_beta, min_alpha, min_x_stop):
    
    # all 6 treatments
    scenarios = [
        (optimized_alpha, 0, target_t_0, optimized_beta, optimized_x_stop),
        (optimized_alpha, target_t_0*A2_start_L3/60, target_t_0*A2_end_L3/60, optimized_beta, optimized_x_stop),
        (optimized_alpha, 0, target_t_0*A1_end_L3/60, optimized_beta, optimized_x_stop),
        (optimized_alpha, target_t_0*B3_start_L3 /60, target_t_0*B3_end_L3/60, optimized_beta, optimized_x_stop),
        (optimized_alpha, target_t_0*B2_start_L3/60, target_t_0*B2_end_L3/60, optimized_beta, optimized_x_stop),
        (optimized_alpha, 0, target_t_0*B1_end_L3/60, optimized_beta, optimized_x_stop),
        (optimized_alpha, target_t_0*C1_start_L3/60, target_t_0*C1_end_L3/60, optimized_beta, optimized_x_stop),
        (optimized_alpha, target_t_0*C2_start_L3/60, target_t_0*C2_end_L3/60, optimized_beta, optimized_x_stop),
        (optimized_alpha, target_t_0*D1_start_L3/60, target_t_0*D1_end_L3/60, optimized_beta, optimized_x_stop),
        (optimized_alpha, target_t_0*D2_start_L3/60, target_t_0*D2_end_L3/60, optimized_beta, optimized_x_stop),
        (optimized_alpha, target_t_0*D3_start_L3/60, target_t_0*D3_end_L3/60, optimized_beta, optimized_x_stop)
    ]
    
    # solve the ODEs for each treatment and store results
    results = []
    for args in scenarios:
        solution = solve_ivp(odes, t_span, i0, args=args, t_eval=np.linspace(0, t_end, 1000), max_step=0.01)
        results.append((solution.t, solution.y[0]))

    scenario_pred = []
    for i, (t, x) in enumerate(results):
        if np.any(x >= target_x):
            target_t = t[np.isfinite(x)].max()
            scenario_pred.append((target_t_0-target_t)*60/target_t_0) # calculate advancement
        else:
            print(f"Scenario {i}: x does not reach the value of {target_x} within the given time span.")
    
    advancement_distributions.append(scenario_pred)



# 95% confidence interval for each treatment
pred_ci_lower = []
pred_ci_upper = []

for i in range(len(advancement_distributions[0])):
    distribution = [sublist[i] for sublist in advancement_distributions]

    ci_lower = np.percentile(distribution, 2.5)
    ci_upper = np.percentile(distribution, 97.5)

    pred_ci_lower.append(ci_lower)
    pred_ci_upper.append(ci_upper)

In [None]:
# Figure 3C

categories_AB = ["continuous", "A2", "A1", "B3", "B2", "B1"]
prediction_AB = prediction[:6]
diff_means = [60-continuous_mean, 60-A2_mean, 60-A1_mean, 60-B3_mean, 60-B2_mean, 60-B1_mean]
sem_AB = [stats.sem(continuous_L3), stats.sem(A2_L3), stats.sem(A1_L3), stats.sem(B3_L3), stats.sem(B2_L3), stats.sem(B1_L3)]
colors_AB = [ ] # choose your own colors!
pred_ci_lower_AB = pred_ci_lower[:6]
pred_ci_upper_AB = pred_ci_upper[:6]


for i in range(len(diff_means)):
    plt.errorbar(diff_means[i], prediction_AB[i], xerr=sem_AB[i], yerr=[[prediction_AB[i] - pred_ci_lower_AB[i]], [pred_ci_upper_AB[i] - prediction_AB[i]]], fmt='o', ecolor=colors_AB[i], capsize=5, color=colors_AB[i], label=categories_AB[i])


x_fit = np.array([0, max(diff_means)+5])

slope, intercept, r_value, p_value, std_err = linregress(diff_means, prediction_AB)
fit_line = np.array(x_fit) * slope + intercept
r_squared = r_value**2
print(slope, intercept, r_squared)


plt.plot(x_fit, fit_line, color='dimgray', linestyle='--')
plt.xlabel("Experiment (hours)", fontsize=12)
plt.ylabel("Prediction (hours)", fontsize=12)
plt.title('Pupation Advancement in Experiment and Prediction', fontsize=12)
plt.tick_params(axis='both', which='major', labelsize=10)
plt.xlim(left=0, right=20)
plt.ylim(bottom=0, top=20)
plt.legend(prop={'size': 9})
plt.show()

In [None]:
# Figure 4A

# range of t0 values that we want to start the pulse
t0_values = np.linspace(0, t_end, 1000)

# grids for pulse durations and end times
pulse_durations = np.linspace(0, 60, 121)
end_times = np.linspace(0, 60, 121)

advancement_grid_triangle_end = np.full((len(pulse_durations), len(end_times)), np.nan)

for i, duration in tqdm(enumerate(pulse_durations)):
    for j, end_time in enumerate(end_times):
        t0 = (end_time - duration) * target_t_0 / 60
        t1 = end_time * target_t_0 / 60

        if t0 < 0 or t1 > target_t_0:
            continue

        solution = solve_ivp(
            odes,
            t_span,
            i0,
            args=(optimized_alpha, t0, t1, optimized_beta, optimized_x_stop),
            t_eval=np.linspace(0, t_end, 1000),
            max_step=0.01,
        )
        t = solution.t
        x = solution.y[0]

        if np.any(x >= target_x):  # calculate advancement
            target_t = t.max()
            advancement_grid_triangle_end[i, j] = (target_t_0 - target_t) * 60 / target_t_0


X_triangle_end, Y_triangle_end = np.meshgrid(np.linspace(0, 60, advancement_grid_triangle_end.shape[1]), np.linspace(0, 60, advancement_grid_triangle_end.shape[0]))

plt.figure(figsize=(8, 6))
contour = plt.contourf(
    X_triangle_end,
    Y_triangle_end,
    advancement_grid_triangle_end,
    levels=100,
    cmap="Greens",
)

plt.colorbar(contour, label="Advancement in Pupation Time (hrs)", pad=0.12)

contour = plt.contour(
    X_triangle_end,
    Y_triangle_end,
    advancement_grid_triangle_end,
    levels=10,
    colors='black'
)

plt.clabel(contour, inline=True, fontsize=10)

plt.title("Sensitivity Heatmap", fontsize=12)
plt.xlabel("End Time of Pulse (hrs)", fontsize=12)
plt.ylabel("Pulse Duration (hrs)", fontsize=12)
plt.gca().yaxis.set_label_position("right")
plt.gca().yaxis.tick_right()
plt.tick_params(axis="both", which="major", labelsize=10)
plt.show()

In [None]:
# Generate 95% confidence interval for a specific duration of light exposure (e.g. Figure 4B-C)
# After it is completed, we don't have to run this block again

duration = C1_end_L3 - C1_start_L3  # same for: D1_end_L3 - D1_start_L3

distributions_95CI = []

param_combinations = list(zip(min_beta, min_alpha, min_x_stop))
for t0 in tqdm(t0_values):
    
    current_distribution = Parallel(n_jobs=10, batch_size="auto")(
        delayed(compute_95CI)(t0, beta, alpha, x_stop, odes, t_end, duration, target_x)
        for beta, alpha, x_stop in param_combinations
    )
    
    distributions_95CI.append(current_distribution)


with open(f"/path/to/save/{duration}-95CI.csv", mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow([''] * len(min_beta))

    for distribution in distributions_95CI:
        writer.writerow(distribution)

In [None]:
# Read the CSV file of the 95% CI of a specific duration

# different pulse durations
C1_duration = (C1_end_L3 - C1_start_L3)*target_t_0/60
D1_duration = (D1_end_L3 - D1_start_L3)*target_t_0/60

CI_C1 = np.genfromtxt(f"/path/to/read/{C1_duration}-95CI.csv", delimiter=",", skip_header=1)
CI_D1 = np.genfromtxt(f"/path/to/read/{D1_duration}-95CI.csv", delimiter=",", skip_header=1)

pred_ci_low_C1 = []
pred_ci_up_C1 = []
pred_ci_low_D1 = []
pred_ci_up_D1 = []

for row in CI_C1:
    row = row[~np.isnan(row)]

    if len(row) > 0:
        ci_low = np.percentile(row, 2.5)
        ci_up = np.percentile(row, 97.5)
        pred_ci_low_C1.append(ci_low)
        pred_ci_up_C1.append(ci_up)

for row in CI_D1:
    row = row[~np.isnan(row)]

    if len(row) > 0:
        ci_low = np.percentile(row, 2.5)
        ci_up = np.percentile(row, 97.5)
        pred_ci_low_D1.append(ci_low)
        pred_ci_up_D1.append(ci_up)

pred_ci_low_C1 = np.array(pred_ci_low_C1)
pred_ci_up_C1 = np.array(pred_ci_up_C1)
pred_ci_low_D1 = np.array(pred_ci_low_D1)
pred_ci_up_D1 = np.array(pred_ci_up_D1)

pred_ci_low_combo = [pred_ci_low_C1, pred_ci_low_D1]
pred_ci_up_combo = [pred_ci_up_C1, pred_ci_up_D1]

In [None]:
# Figure 4B-C

pulses = [C1_duration, D1_duration]

color_sensitivity = ["black", "black"]

for duration, color, pred_ci_low, pred_ci_up in zip(pulses, color_sensitivity, pred_ci_low_combo, pred_ci_up_combo):
    target_t_values = []
    for t0 in tqdm(t0_values):
        t1 = t0 + duration
        if t1 >= target_t_0:
            target_t_values.append(np.nan)
            continue
            
        solution = solve_ivp(odes, t_span, i0, args=(optimized_alpha, t0, t1, optimized_beta, optimized_x_stop), t_eval=np.linspace(0, t_end, 1000), max_step=0.01)
        t = solution.t
        x = solution.y[0]

        if np.any(x >= target_x):
            target_t = t.max()
            target_t_values.append(target_t_0 - target_t)
        else:
            target_t_values.append(np.nan)

    # plotting advancement against end time of the pulse for each pulse duration
    normalized_t = np.array([x*60/target_t_0 for x in target_t_values])
    normalized_t = normalized_t[~np.isnan(normalized_t)]
    length = len(normalized_t)
    plt.plot((t0_values*60/target_t_0+duration*60/target_t_0)[0:length], normalized_t, color=color, marker='', linestyle='-', linewidth=1.5)
    plt.fill_between((t0_values*60/target_t_0+duration*60/target_t_0)[0:length], pred_ci_low[0:length], pred_ci_up[0:length], color='gray', edgecolor='none', alpha=0.35) 



positions = [C1_end_L3, C2_end_L3, D1_end_L3, D2_end_L3, D3_end_L3,]
data_CD = [60-C1_mean, 60-C2_mean, 60-D1_mean, 60-D2_mean, 60-D3_mean]
sem_CD = [stats.sem(C1_L3), stats.sem(C2_L3), stats.sem(D1_L3), stats.sem(D2_L3), stats.sem(D3_L3)]
categories_CD = ["C1", "C2", "D1", "D2", "D3"]
colors_CD = [ ]  # choose your colors

for i in range(len(positions)):
    plt.errorbar(positions[i], data_CD[i], yerr=sem_CD[i], fmt='o', ecolor=colors_CD[i], capsize=5, color=colors_CD[i], label=categories_CD[i])
    

plt.xlabel('End Time of Pulse (hrs)', fontsize=12)
plt.ylabel('Advancement in Pupation Time (hrs)', fontsize=12)
plt.xlim(left=15, right=60)
plt.ylim(bottom=0, top=20)
plt.tick_params(axis='both', which='major', labelsize=10)
plt.legend(loc='upper left')
plt.show()