In [None]:
import pandas as pd
import numpy as np
import datetime
from dateutil.parser import parse
import seaborn as sns
import matplotlib.pyplot as plt

df_raw = pd.read_excel("wt_cellcycle_durations.xlsx", header=None)

# assign column names and set index
df_raw.columns = ['Division'] + list(range(1, df_raw.shape[1]))
df_raw.set_index('Division', inplace=True)

# convert to hours
def value_to_hours(val):
    if pd.isna(val):
        return np.nan

    # datetime.datetime or pandas Timestamp
    if isinstance(val, (datetime.datetime, pd.Timestamp)):
        return val.hour + val.minute / 60 + val.second / 3600

    # excel sometimes uses datetime.time
    if isinstance(val, datetime.time):
        return val.hour + val.minute / 60 + val.second / 3600

    # excel may store durations as fraction of a day
    if isinstance(val, (float, int)) and 0 <= val < 1:
        return val * 24

    return np.nan

df_hours = df_raw.applymap(value_to_hours)

# clean index — extract digits from labels
# reset_index() gives us "Division" as a column
df_hours = df_hours.reset_index()

# extract the number from "Div 1", "Div2", etc.
df_hours["Division Number"] = df_hours["Division"].str.extract(r"(\d+)")[0].astype(float)

# set cleaned index
df_hours = df_hours.set_index("Division Number")
df_hours = df_hours.drop(columns=["Division"])


In [None]:
df_hours

In [None]:
# compute mean and standard deviation for each division
mean_durations = df_hours.mean(axis=1)
std_durations = df_hours.std(axis=1)
summary = pd.DataFrame({
    "Mean Duration (hrs)": mean_durations,
    "Std Dev (hrs)": std_durations
})

print(summary)

In [None]:
# melt the wide-format DataFrame to long format
df_long = df_hours.reset_index().melt(id_vars="Division Number",
                                      var_name="Cell",
                                      value_name="Duration (hrs)")
df_long = df_long.dropna()

# compute summary stats
summary = df_long.groupby("Division Number")["Duration (hrs)"].agg(["mean", "std"])

# get the unique division numbers in the order Seaborn will plot them
division_order = sorted(df_long["Division Number"].unique())

# map division numbers to their categorical x positions
x_positions = list(range(len(division_order)))
mean_values = summary.loc[division_order, "mean"]
std_values = summary.loc[division_order, "std"]


plt.figure(figsize=(10, 6))
sns.stripplot(data=df_long, x="Division Number", y="Duration (hrs)",
              jitter=True, alpha=0.6, color='black', order=division_order)
plt.errorbar(x_positions, mean_values, yerr=std_values,
             fmt='o', color='red', capsize=4, label="Mean ± SD")

# final plot setup
plt.title("WT Neuroblast Raw Pooled Cell Cycle Durations")
plt.grid(True, linestyle='--', alpha=0.5)
plt.xlabel("Division Number")
plt.ylabel("Cell Cycle Duration (hrs)")
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
# need to normalize to first division time
df_norm = df_hours.copy()
for cell in df_norm.columns:
    first_div = df_norm[cell].dropna().iloc[0]
    df_norm[cell] = df_norm[cell] / first_div

mean_durations = df_norm.mean(axis=1)
std_durations = df_norm.std(axis=1)
summary = pd.DataFrame({
    "Mean Duration (hrs)": mean_durations,
    "Std Dev (hrs)": std_durations
})

print(summary)


In [None]:
df_long = df_norm.reset_index().melt(id_vars="Division Number", 
                                      var_name="Cell", 
                                      value_name="Duration (hrs)")
df_long = df_long.dropna()

summary = df_long.groupby("Division Number")["Duration (hrs)"].agg(["mean", "std"])

division_order = sorted(df_long["Division Number"].unique())

x_positions = list(range(len(division_order)))
mean_values = summary.loc[division_order, "mean"]
std_values = summary.loc[division_order, "std"]

plt.figure(figsize=(10, 6))
sns.stripplot(data=df_long, x="Division Number", y="Duration (hrs)",
              jitter=True, alpha=0.6, color='black', order=division_order)
plt.errorbar(x_positions, mean_values, yerr=std_values,
             fmt='o', color='red', capsize=4, label="Mean ± SD")

plt.title("WT Neuroblast Raw Pooled Cell Cycle Durations")
plt.grid(True, linestyle='--', alpha=0.5)
plt.xlabel("Division Number")
plt.ylabel("Cell Cycle Duration (hrs)")
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
# Candidate functions
def linear(x, a, b):
    return a * x + b

def logarithmic(x, a, b):
    return a * np.log(x) + b

def quadratic(x, a, b, c):
    return a * x**2 + b * x + c

def cubic(x, a, b, c, d):
    return a * x**3 + b * x**2 + c * x + d

def exponential_decay(x, a, b, c):
    return a * np.exp(-b * x) + c

# R² scoring
def r_squared(y_true, y_pred):
    ss_res = np.sum((y_true - y_pred) ** 2)
    ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
    return 1 - (ss_res / ss_tot)

Fit models to each cell's profile individually

In [None]:
from collections import defaultdict

per_cell_fit_results = defaultdict(dict)

for cell in df_norm.columns:
    y = df_norm[cell].dropna()
    x = y.index.astype(float)
    y = y.values

    if len(x) >= 3: # enough points to fit 2 parameter models without default R^2 being 1
        for model_name, func in [("Linear", linear), ("Logarithmic", logarithmic)]:
                try:
                    params, _ = curve_fit(func, x, y)
                    y_pred = func(x, *params)
                    r2 = r_squared(y, y_pred)
                    per_cell_fit_results[model_name][cell] = {
                        "params": params,
                        "r2": r2
                    }
                    continue
                except:
                    continue

    if len(x) >=4: # enough points to fit quadratic, exponential decay, and cubic
        for model_name, func in [("Quadratic", quadratic), ("Exponential Decay", exponential_decay), ("Cubic", cubic)]:
            try:
                params, _ = curve_fit(func, x, y)
                y_pred = func(x, *params)
                r2 = r_squared(y, y_pred)
                per_cell_fit_results[model_name][cell] = {
                    "params": params,
                    "r2": r2
                }
            except:
                continue

In [None]:
def plot_cell_best_and_all_models(cell_id):
    # observed
    y_series = df_norm[cell_id].dropna()
    if y_series.empty:
        print(f"No data for cell {cell_id}")
        return
    x = y_series.index.astype(float).values
    y = y_series.values

    # models available for this cell
    available = {name: res[cell_id]
                 for name, res in per_cell_fit_results.items()
                 if cell_id in res and np.isfinite(res[cell_id].get("r2", np.nan))}

    if not available:
        print(f"No fitted models for cell {cell_id}, too few points")
        return

    # function lookup
    name_to_func = {
        "Linear": linear,
        "Logarithmic": logarithmic,
        "Quadratic": quadratic,
        "Cubic": cubic,
        "Exponential Decay": exponential_decay,
    }

    # pick best by R²
    best_name, best_res = max(available.items(), key=lambda kv: kv[1]["r2"])

    # plot
    import matplotlib.pyplot as plt
    xfit = np.linspace(x.min(), x.max(), 200)
    plt.figure(figsize=(6, 4))
    plt.plot(x, y, 'o-', label="Observed")

    # plot all models; highlight best
    for name, res in available.items():
        func = name_to_func[name]
        yfit = func(xfit, *res["params"])
        is_best = (name == best_name)
        plt.plot(
            xfit, yfit, '--',
            linewidth=2.5 if is_best else 1.2,
            alpha=1.0 if is_best else 0.6,
            label=f"{name} (R²={res['r2']:.3f})",
            zorder=3 if is_best else 2
        )

    plt.title(f"Cell {cell_id}: best = {best_name} (R²={best_res['r2']:.3f})")
    plt.xlabel("Division Number")
    plt.ylabel("Duration (relative to first division)")
    plt.grid(True, linestyle='--', alpha=0.5)
    plt.legend()
    plt.tight_layout()
    plt.show()

for cid in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25]:
    if cid in df_norm.columns:
        plot_cell_best_and_all_models(cid)

fit models to pooled global data

In [None]:
def fit_global_model(df_hours, func, min_points=3, p0=None, maxfev=10000):
    """
    Fit a single model to all points from cells with >= min_points.
    Returns dict with params, R2, included_cells, and the stacked x/y used.
    """
    x_all, y_all, included_cells = [], [], []
    for cell in df_hours.columns:
        y_series = df_hours[cell].dropna()
        if len(y_series) >= min_points:
            x_vals = y_series.index.values.astype(float)
            y_vals = y_series.values
            x_all.extend(x_vals)
            y_all.extend(y_vals)
            included_cells.append(cell)

    x_all = np.asarray(x_all, dtype=float)
    y_all = np.asarray(y_all, dtype=float)

    if x_all.size == 0:
        raise ValueError(f"No cells met min_points={min_points}")

    params, _ = curve_fit(func, x_all, y_all, p0=p0, maxfev=maxfev)
    y_hat = func(x_all, *params)
    R2 = r_squared(y_all, y_hat)

    return {
        "params": params,
        "R2": float(R2),
        "included_cells": included_cells,
        "x_all": x_all,
        "y_all": y_all,
    }

def plot_global_fit(result, func, model_name="Model", color="tab:red"):
    """
    Visualize the global fit: scatter of all included points + fitted curve.
    """
    x_all = result["x_all"]
    y_all = result["y_all"]
    params = result["params"]
    R2 = result["R2"]

    xfit = np.linspace(np.min(x_all), np.max(x_all), 300)
    yfit = func(xfit, *params)

    plt.figure(figsize=(8,5))
    plt.scatter(x_all, y_all, s=20, alpha=0.6, label="Included points", color="black")
    plt.plot(xfit, yfit, "--", lw=2, color=color, label=f"{model_name} fit")
    # params string (short and robust)
    ptxt = ", ".join([f"{p:.3g}" for p in params])
    plt.title(f"{model_name}: R²={R2:.3f} | params=[{ptxt}]")
    plt.xlabel("Division Number")
    plt.ylabel("Duration (relative to first division)")
    plt.grid(True, ls="--", alpha=0.4)
    plt.legend()
    plt.tight_layout()
    plt.show()

In [None]:
res_lin = fit_global_model(df_norm, linear, min_points=3)
plot_global_fit(res_lin, linear, model_name="Linear")
print("Included cells (linear ≥3):", res_lin["included_cells"])

In [None]:
res_lin = fit_global_model(df_norm, logarithmic, min_points=3)
plot_global_fit(res_lin, logarithmic, model_name="Logarithmic")
print("Included cells (logarithmic ≥3):", res_lin["included_cells"])

In [None]:
res_lin = fit_global_model(df_norm, quadratic, min_points=3)
plot_global_fit(res_lin, quadratic, model_name="Quadratic")
print("Included cells (quadratic ≥4):", res_lin["included_cells"])

In [None]:
# crude p0: (a, b, c)
y_med = float(np.nanmedian(df_norm.values))
p0_exp = [max(0.1, np.nanmax(df_norm.values) - y_med), 0.3, y_med]

res_exp = fit_global_model(df_norm, exponential_decay, min_points=4, p0=p0_exp)
plot_global_fit(res_exp, exponential_decay, model_name="Exponential Decay")
print("Included cells (exponential decay ≥4):", res_lin["included_cells"])

In [None]:
res_lin = fit_global_model(df_norm, cubic, min_points=5)
plot_global_fit(res_lin, cubic, model_name="Cubic")
print("Included cells (cubic ≥5):", res_lin["included_cells"])

## I think the logarithmic model has the best balance of complexity and performance here

---
Repeat for mudmut

In [None]:
df_raw = pd.read_excel("mudmut_cellcycle_durations.xlsx", header=None)

df_raw.columns = ['Division'] + list(range(1, df_raw.shape[1]))
df_raw.set_index('Division', inplace=True)

def value_to_hours(val):
    if pd.isna(val):
        return np.nan

    if isinstance(val, (datetime.datetime, pd.Timestamp)):
        return val.hour + val.minute / 60 + val.second / 3600

    if isinstance(val, datetime.time):
        return val.hour + val.minute / 60 + val.second / 3600

    if isinstance(val, (float, int)) and 0 <= val < 1:
        return val * 24

    return np.nan

df_hours = df_raw.applymap(value_to_hours)

df_hours = df_hours.reset_index()

df_hours["Division Number"] = df_hours["Division"].str.extract(r"(\d+)")[0].astype(float)

df_hours = df_hours.dropna(subset=["Division Number"])
df_hours["Division Number"] = df_hours["Division Number"].astype(int)

df_hours = df_hours.set_index("Division Number")
df_hours = df_hours.drop(columns=["Division"])
df_hours

In [None]:
# normalize again
df_norm = df_hours.copy()
for cell in df_norm.columns:
    first_div = df_norm[cell].dropna().iloc[0]
    df_norm[cell] = df_norm[cell] / first_div

mean_durations = df_norm.mean(axis=1)
std_durations = df_norm.std(axis=1)

summary = pd.DataFrame({
    "Mean Duration (hrs)": mean_durations,
    "Std Dev (hrs)": std_durations
})

print(summary)

In [None]:
df_long = df_norm.reset_index().melt(id_vars="Division Number", 
                                      var_name="Cell", 
                                      value_name="Duration (hrs)")
df_long = df_long.dropna()

summary = df_long.groupby("Division Number")["Duration (hrs)"].agg(["mean", "std"])

division_order = sorted(df_long["Division Number"].unique())

x_positions = list(range(len(division_order)))
mean_values = summary.loc[division_order, "mean"]
std_values = summary.loc[division_order, "std"]

plt.figure(figsize=(10, 6))
sns.stripplot(data=df_long, x="Division Number", y="Duration (hrs)",
              jitter=True, alpha=0.6, color='black', order=division_order)
plt.errorbar(x_positions, mean_values, yerr=std_values,
             fmt='o', color='red', capsize=4, label="Mean ± SD")

plt.title("mudmut Neuroblast Raw Pooled Cell Cycle Durations")
plt.grid(True, linestyle='--', alpha=0.5)
plt.xlabel("Division Number")
plt.ylabel("Cell Cycle Duration (hrs)")
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
from collections import defaultdict

per_cell_fit_results = defaultdict(dict)

for cell in df_norm.columns:
    y = df_norm[cell].dropna()
    x = y.index.astype(float)
    y = y.values

    if len(x) >= 3: # enough points to fit 2 parameter models without default R^2 being 1
        for model_name, func in [("Linear", linear), ("Logarithmic", logarithmic)]:
                try:
                    params, _ = curve_fit(func, x, y)
                    y_pred = func(x, *params)
                    r2 = r_squared(y, y_pred)
                    per_cell_fit_results[model_name][cell] = {
                        "params": params,
                        "r2": r2
                    }
                    continue
                except:
                    continue

    if len(x) >=4: # enough points to fit quadratic, exponential decay, and cubic
        for model_name, func in [("Quadratic", quadratic), ("Exponential Decay", exponential_decay), ("Cubic", cubic)]:
            try:
                params, _ = curve_fit(func, x, y)
                y_pred = func(x, *params)
                r2 = r_squared(y, y_pred)
                per_cell_fit_results[model_name][cell] = {
                    "params": params,
                    "r2": r2
                }
            except:
                continue

In [None]:
for cid in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]:
    if cid in df_norm.columns:
        plot_cell_best_and_all_models(cid)

global fits:

In [None]:
res_lin = fit_global_model(df_norm, linear, min_points=3)
plot_global_fit(res_lin, linear, model_name="Linear")
print("Included cells (linear ≥3):", res_lin["included_cells"])

In [None]:
res_lin = fit_global_model(df_norm, logarithmic, min_points=3)
plot_global_fit(res_lin, logarithmic, model_name="Logarithmic")
print("Included cells (logarithmic ≥3):", res_lin["included_cells"])

In [None]:
res_lin = fit_global_model(df_norm, quadratic, min_points=3)
plot_global_fit(res_lin, quadratic, model_name="Quadratic")
print("Included cells (quadratic ≥4):", res_lin["included_cells"])

In [None]:
# crude p0: (a, b, c)
y_med = float(np.nanmedian(df_norm.values))
p0_exp = [max(0.1, np.nanmax(df_norm.values) - y_med), 0.3, y_med]

res_exp = fit_global_model(df_norm, exponential_decay, min_points=4, p0=p0_exp)
plot_global_fit(res_exp, exponential_decay, model_name="Exponential Decay")
print("Included cells (exponential decay ≥4):", res_lin["included_cells"])

In [None]:
res_lin = fit_global_model(df_norm, cubic, min_points=5)
plot_global_fit(res_lin, cubic, model_name="Cubic")
print("Included cells (cubic ≥5):", res_lin["included_cells"])

### with 4 cells it is hard to say anything conclusive imo