In [1]:
import numpy as np
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from datetime import datetime
import os

# File paths
file_path = '/Users/Frank/Downloads/DBA-fitting/merged_output-DBA.txt'
results_dir = os.path.dirname(file_path)
results_file_path = os.path.join(results_dir, 'dye_alone_linear_fit_results.txt')

# Input constants
d0_in_M = 1.51e-6  # Total initial concentration of dye (M)

#Fitting Thresholds
rmse_threshold_factor = 2  # Factor to multiply the RSME for acceptable fits compared to the best fit.
r2_threshold = 0.9    #R2 value used for filtering acceptable and unacceptable fits.

#################################################################################################################
# Do NOT change code after this line
#################################################################################################################

# Convert initial concentration to µM units
d0 = d0_in_M * 1e6  # d0 in µM

# Initialize parameter ranges with default boundaries
I0_lower, I0_upper = 0, None
Id_lower, Id_upper = 1e3 / 1e6, 1e18 / 1e6  # Convert Id range to µM⁻¹

# Load prediction intervals for I0 and Id from existing results file if available
if os.path.exists(results_file_path):
    try:
        with open(results_file_path, 'r') as f:
            lines = f.readlines()

        # Extract Id prediction interval from the file if available
        id_prediction_line = next((line for line in lines if 'Id prediction interval' in line), None)
        if id_prediction_line and 'not applicable' not in id_prediction_line:
            Id_lower = float(id_prediction_line.split('[')[-1].split(',')[0].strip()) / 1e6
            Id_upper = float(id_prediction_line.split(',')[-1].split(']')[0].strip()) / 1e6
        else:
            average_Id = float(next(line for line in lines if 'Average Id' in line).split('\t')[-1].strip()) / 1e6
            Id_lower, Id_upper = 0.5 * average_Id, 2.0 * average_Id

        # Extract I0 prediction interval from the file if available
        i0_prediction_line = next((line for line in lines if 'I0 prediction interval' in line), None)
        if i0_prediction_line and 'not applicable' not in i0_prediction_line:
            I0_lower = float(i0_prediction_line.split('[')[-1].split(',')[0].strip())
            I0_upper = float(i0_prediction_line.split(',')[-1].split(']')[0].strip())
        else:
            average_I0 = float(next(line for line in lines if 'Average I0' in line).split('\t')[-1].strip())
            I0_lower, I0_upper = 0.5 * average_I0, 2.0 * average_I0

    except Exception as e:
        print(f"Error parsing boundaries from the results file: {e}")
        Id_lower, Id_upper = 1e3 / 1e6, 1e18 / 1e6  # Default bounds if parsing fails
        I0_lower, I0_upper = 0, np.inf
else:
    # Set default ranges if no results file is present
    Id_lower, Id_upper = 1e3 / 1e6, 1e18 / 1e6
    I0_lower, I0_upper = 0, np.inf

# Print boundary values for verification
print(f"Loaded boundaries:\nId: [{Id_lower * 1e6:.3e}, {Id_upper * 1e6:.3e}] M⁻¹\nI0: [{I0_lower:.3e}, {I0_upper:.3e}]")

# Define function to calculate signal based on the parameters and h0 values
def compute_signal(params, h0_values, d0):
    I0, Kd, Id, Ihd = params
    Signal_values = []
    for h0 in h0_values:
        delta = h0 - d0
        a = Kd
        b = Kd * delta + 1
        c = -d0
        discriminant = b**2 - 4 * a * c

        if discriminant < 0:
            Signal_values.append(np.nan)
            continue

        sqrt_discriminant = np.sqrt(discriminant)
        d1 = (-b + sqrt_discriminant) / (2 * a)
        d2 = (-b - sqrt_discriminant) / (2 * a)

        d = d1 if d1 >= 0 else d2 if d2 >= 0 else np.nan
        if np.isnan(d):
            Signal_values.append(np.nan)
            continue

        h = d + delta
        hd = Kd * h * d
        Signal = I0 + Id * d + Ihd * hd
        Signal_values.append(Signal)

    return np.array(Signal_values)

# Function to compute residuals between observed and computed signals
def residuals(params, h0_values, Signal_observed, d0):
    Signal_computed = compute_signal(params, h0_values, d0)
    residual = Signal_observed - Signal_computed
    residual = np.nan_to_num(residual, nan=1e6)
    return residual

# Function to calculate RMSE and R² for model evaluation
def calculate_fit_metrics(Signal_observed, Signal_computed):
    rmse = np.sqrt(np.nanmean((Signal_observed - Signal_computed) ** 2))
    ss_res = np.nansum((Signal_observed - Signal_computed) ** 2)
    ss_tot = np.nansum((Signal_observed - np.nanmean(Signal_observed)) ** 2)
    r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else np.nan
    return rmse, r_squared

# Load data from the specified file path
def load_data(file_path):
    try:
        with open(file_path, 'r') as f:
            lines = f.readlines()
        return lines
    except Exception as e:
        print(f"Error loading file: {e}")
        return None

# Function to split data into replicas based on specified delimiters
def split_replicas(data):
    replicas, current_replica = [], []
    for line in data:
        if "var" in line.lower():
            if current_replica:
                replicas.append(np.array(current_replica))
                current_replica = []
        else:
            try:
                x, y = map(float, line.split())
                if x == 0.0 and current_replica:
                    replicas.append(np.array(current_replica))
                    current_replica = []
                current_replica.append((x, y))
            except ValueError:
                continue

    if current_replica:
        replicas.append(np.array(current_replica))

    return replicas if replicas else None

# Load and process data, then split it into individual replicas
data_lines = load_data(file_path)
if data_lines is None:
    raise ValueError("Data loading failed.")

replicas = split_replicas(data_lines)
if replicas is None:
    raise ValueError("Replica splitting failed.")

print(f"Number of replicas detected: {len(replicas)}")

# Iterate through each replica to perform fitting and analysis
for replica_index, replica_data in enumerate(replicas, start=1):
    print(f"Processing replica {replica_index}, data length: {len(replica_data)}")
    h0_values = replica_data[:, 0] * 1e6  # Convert h0 values to µM
    Signal_observed = replica_data[:, 1]

    # Skip replicas with insufficient data points
    if len(h0_values) < 2:
        print(f"Replica {replica_index} has insufficient data. Skipping.")
        continue

    # Update I0_upper based on minimum observed signal if it was undefined
    I0_upper = np.min(Signal_observed) if I0_upper is None or np.isinf(I0_upper) else I0_upper

    # Generate initial parameter guesses within specified bounds
    Ihd_guess_smaller = Signal_observed[0] < Signal_observed[-1]
    initial_params_list = []
    for _ in range(200):
        I0_guess = np.random.uniform(I0_lower, I0_upper)
        Kd_guess = 10 ** np.random.uniform(-5, 5)
        if Ihd_guess_smaller:
            Id_guess = 10 ** np.random.uniform(np.log10(Id_lower), np.log10(Id_upper))
            Ihd_guess = Id_guess * np.random.uniform(0.1, 0.5)
        else:
            Ihd_guess = 10 ** np.random.uniform(np.log10(Id_lower), np.log10(Id_upper))
            Id_guess = Ihd_guess * np.random.uniform(0.1, 0.5)
        initial_params_list.append([I0_guess, Kd_guess, Id_guess, Ihd_guess])

    # Perform optimization and collect results for each initial guess
    best_result, best_cost = None, np.inf
    fit_results = []
    for initial_params in initial_params_list:
        result = minimize(lambda params: np.sum(residuals(params, h0_values, Signal_observed, d0) ** 2),
                          initial_params, method='L-BFGS-B',
                          bounds=[(I0_lower, I0_upper), (1e-8, 1e8), (Id_lower, Id_upper), (1e-8, 1e8)])

        Signal_computed = compute_signal(result.x, h0_values, d0)
        rmse, r_squared = calculate_fit_metrics(Signal_observed, Signal_computed)
        fit_results.append((result.x, result.fun, rmse, r_squared))

        if result.fun < best_cost:
            best_cost = result.fun
            best_result = result

    # Filter fit results based on RMSE and R² thresholds
    best_rmse = min(fit_rmse for _, _, fit_rmse, _ in fit_results)
    rmse_threshold = best_rmse * rmse_threshold_factor
    #r2_threshold = 0.9

    filtered_results = [
        (params, fit_rmse, fit_r2) for params, _, fit_rmse, fit_r2 in fit_results
        if fit_rmse <= rmse_threshold and fit_r2 >= r2_threshold
    ]

    # Calculate the median of the filtered parameters if they exist
    if filtered_results:
        median_params = np.median(np.array([result[0] for result in filtered_results]), axis=0)
    else:
        print("Warning: No fits meet the filtering criteria.")
        continue

    # Compute signal and metrics for the median parameters
    Signal_computed = compute_signal(median_params, h0_values, d0)
    rmse, r_squared = calculate_fit_metrics(Signal_observed, Signal_computed)

    # Generate points for the fitting curve to overlay on observed data
    fitting_curve_x, fitting_curve_y = [], []
    for i in range(len(h0_values) - 1):
        extra_points = np.linspace(h0_values[i], h0_values[i + 1], 21)
        fitting_curve_x.extend(extra_points)
        fitting_curve_y.extend(compute_signal(median_params, extra_points, d0))

    last_signal = compute_signal(median_params, [h0_values[-1]], d0)[0]
    fitting_curve_x.append(h0_values[-1])
    fitting_curve_y.append(last_signal)

    # Plot observed vs. simulated fitting curve
    plt.figure(figsize=(8, 6))
    plt.plot(h0_values, Signal_observed, 'o', label='Observed Signal')
    plt.plot(fitting_curve_x, fitting_curve_y, '--', color='blue', alpha=0.6, label='Simulated Fitting Curve')
    plt.xlabel('h0 (µM)')
    plt.ylabel('Signal')
    plt.title(f'Observed vs. Simulated Fitting Curve for Replica {replica_index}')
    plt.legend()
    plt.grid(True)

    param_text = (f"Kd: {median_params[1] * 1e6:.2e} M^-1\n"
                  f"I0: {median_params[0]:.2e}\n"
                  f"Id: {median_params[2] * 1e6:.2e} signal/M\n"
                  f"Ihd: {median_params[3] * 1e6:.2e} signal/M\n"
                  f"RMSE: {rmse:.3f}\n"
                  f"R²: {r_squared:.3f}")

    plt.gca().annotate(param_text, xy=(1.05, 0.5), xycoords='axes fraction', fontsize=10,
                       bbox=dict(boxstyle="round,pad=0.3", edgecolor="black", facecolor="lightgrey"))

    plot_file = os.path.join(results_dir, f"fit_plot_replica_{replica_index}.png")
    plt.savefig(plot_file, bbox_inches='tight')
    plt.show()

    # Calculate RMSE and R² for the median fit
    median_rmse, median_r2 = calculate_fit_metrics(Signal_observed, Signal_computed)

    # Export fit parameters and results to a text file
    replica_file = os.path.join(results_dir, f"fit_results_replica_{replica_index}.txt")
    with open(replica_file, 'w') as f:
        f.write(f"Input:\nd0 (M): {d0_in_M:.6e}\n")
        f.write(f"Id lower bound: {Id_lower * 1e6:.3e} signal/M\n")
        f.write(f"Id upper bound: {Id_upper * 1e6:.3e} signal/M\n")
        f.write(f"I0 lower bound: {I0_lower:.3e}\n")
        f.write(f"I0 upper bound: {I0_upper:.3e}\n")

        f.write("\nOutput:\nMedian parameters:\n")
        f.write(f"I0: {median_params[0]:.2e}\n")
        f.write(f"Kd: {median_params[1] * 1e6:.2e} M^-1\n")
        f.write(f"Id: {median_params[2] * 1e6:.2e} signal/M\n")
        f.write(f"Ihd: {median_params[3] * 1e6:.2e} signal/M\n")
        f.write(f"RMSE: {median_rmse:.3f}\n")
        f.write(f"R²: {median_r2:.3f}\n")

        # Export only acceptable filtered fit parameters
        f.write("\nAcceptable Fit Parameters:\n")
        f.write("Kd (M^-1)\tI0\tId (signal/M)\tIhd (signal/M)\tRMSE\tR²\n")
        for params, fit_rmse, fit_r2 in filtered_results:
            f.write(f"{params[1] * 1e6:.2e}\t{params[0]:.2e}\t{params[2] * 1e6:.2e}\t{params[3] * 1e6:.2e}\t{fit_rmse:.3f}\t{fit_r2:.3f}\n")

        # Write the standard deviations
        f.write("\nStandard Deviations:\n")
        f.write(f"Kd Std Dev: {Kd_std:.2e} M^-1\n")
        f.write(f"I0 Std Dev: {I0_std:.2e}\n")
        f.write(f"Id Std Dev: {Id_std:.2e} signal/M\n")
        f.write(f"Ihd Std Dev: {Ihd_std:.2e} signal/M\n")

        # Write the input signal data
        f.write("\nOriginal Data:\nConcentration (M)\tSignal\n")
        for h0, signal in zip(h0_values / 1e6, Signal_observed):  # Convert h0_values to M
            f.write(f"{h0:.6e}\t{signal:.6e}\n")

        # Write the fitting curve data
        f.write("\nFitting Curve:\n")
        f.write("Simulated Concentration (M)\tSimulated Signal\n")
        for x_sim, y_sim in zip(np.array(fitting_curve_x) / 1e6, fitting_curve_y):  # Convert fitting_curve_x to M
            f.write(f"{x_sim:.6e}\t{y_sim:.6e}\n")
        f.write(f"\nDate of Export: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")   # Exports time stamp

    print(f"Results for Replica {replica_index} saved to {replica_file}")

Loaded boundaries:
Id: [1.000e+03, 1.000e+18] M⁻¹
I0: [0.000e+00, inf]
Error loading file: [Errno 2] No such file or directory: '/Users/Frank/Downloads/DBA-fitting/merged_output-DBA.txt'


ValueError: Data loading failed.