In [7]:
### Here is some of the code I used for my final year Chromsome-Condensation project - Taylor-Jai O'Connor ###

import os
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit

# Initial configuration for lengths per bead count
initial_lengths = {
    250: 1906,
    500: 2515,
    1000: 3318,
}

# Initial guesses for xeqm and tau
initial_guesses = {
    250: {'length': (1000, 100), 'width': (500, 10), 'depth': (270, 40)},
    500: {'length': (1800, 575), 'width': (750, 40), 'depth': (400, 20)},
    1000: {'length': (1800, 1200), 'width': (1000, 400), 'depth': (600, 50)},
}

def exp_decay(t, xeq, tau, x0):
    return xeq + (x0 - xeq) * np.exp(-t / tau)

def compute_average_dimensions(coords):
    cov_matrix = np.cov(coords.T)
    eigenvalues = np.linalg.eigvalsh(cov_matrix)
    eigenvalues = np.sort(eigenvalues)[::-1]
    return 2 * np.sqrt(eigenvalues)  # length, width, depth, in descending order

def fit_decay(time, values, bead_count, dimension):
    x0 = initial_lengths[bead_count]
    xeq_guess, tau_guess = initial_guesses[bead_count][dimension]

    def decay_model(t, xeq, tau):
        return exp_decay(t, xeq, tau, x0)

    try:
        popt, _ = curve_fit(
            decay_model, np.array(time), np.array(values),
            p0=[xeq_guess, tau_guess],
            bounds=([0, 1], [x0, np.inf]),
            maxfev=20000
        )
        return x0, popt[0], popt[1]
    except Exception as e:
        print(f"Fit error for {dimension}, {bead_count} beads: {e}")
        return x0, np.nan, np.nan

def process_replicate(file_path, rep_idx):
    dims = {}
    try:
        print(f"Reading replicate {rep_idx}: {file_path}")
        df = pd.read_csv(file_path, delim_whitespace=True, usecols=['time(s)', 'Type(bead)', 'x(nm)', 'y(nm)', 'z(nm)'])
        df = df[df['Type(bead)'] == 'N']
        time_points = sorted(df['time(s)'].unique())

        dims['time'] = []
        dims['length'] = []
        dims['width'] = []
        dims['depth'] = []

        for t in time_points:
            coords = df[df['time(s)'] == t][['x(nm)', 'y(nm)', 'z(nm)']].values
            length, width, depth = compute_average_dimensions(coords)
            dims['time'].append(t)
            dims['length'].append(length)
            dims['width'].append(width)
            dims['depth'].append(depth)

        print(f"Finished processing replicate {rep_idx} ({len(time_points)} timepoints)")

    except Exception as e:
        print(f"Error processing {file_path}: {e}")
    return pd.DataFrame(dims)

def plot_fitted_curve(time, values, xeq, tau, bead_count, dimension, output_path):
    x0 = initial_lengths[bead_count]
    plt.figure(figsize=(8, 6))
    plt.plot(time, values, label='Mean Data')
    if not np.isnan(xeq) and not np.isnan(tau):
        fitted = exp_decay(np.array(time), xeq, tau, x0)
        plt.plot(time, fitted, '--', label=f'Exp Fit\nxeq={xeq:.1f}, tau={tau:.1f}')
    plt.xlabel('Time (s)')
    plt.ylabel(f'{dimension.capitalize()} (nm)')
    plt.title(f'Average {dimension.capitalize()} vs Time\n{bead_count} Beads')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(output_path)
    plt.close()
    print(f" Plot saved to: {output_path}")

def main():
    base_dir = r""
    output_dir = r""
    os.makedirs(output_dir, exist_ok=True)

    bead_counts = [1000, 250, 500]  # Start with 1000, get the largest files out of the way first
    dimensions = ['length', 'width', 'depth']

    for bead_count in bead_counts:
        print(f"\n Starting processing for {bead_count} beads...")
        bead_start = time.time()

        replicate_paths = [
            os.path.join(base_dir, str(bead_count), f'replicate_{i}', '_msd_bead_coord.txt')
            for i in range(1, 11)
        ]

        time_list = None
        dim_accum = {dim: [] for dim in dimensions}

        for rep_idx, file_path in enumerate(replicate_paths, 1):
            if not os.path.isfile(file_path):
                print(f"Missing replicate {rep_idx}: {file_path}")
                continue

            rep_start = time.time()
            df = process_replicate(file_path, rep_idx)
            if df.empty:
                continue

            if time_list is None:
                time_list = df['time'].values

            for dim in dimensions:
                dim_accum[dim].append(df[dim].values)

            print(f"Replicate {rep_idx} completed in {time.time() - rep_start:.2f} sec")

        if time_list is None:
            print(f"No usable data for {bead_count} beads. Skipping.")
            continue

        time_array = np.array(time_list)
        for dim in dimensions:
            print(f"Plotting: {dim} dimension for {bead_count} beads")
            try:
                dim_matrix = np.array(dim_accum[dim], dtype=object)
                dim_matrix = np.stack(dim_matrix)  # Truncated timepoints, make sure dimensions of all matrices are the same for easy plotting
                mean_values = np.mean(dim_matrix, axis=0)
                x0, xeq, tau = fit_decay(time_array, mean_values, bead_count, dim)

                plot_path = os.path.join(output_dir, f'{bead_count}_{dim}_fit.png')
                plot_fitted_curve(time_array, mean_values, xeq, tau, bead_count, dim, plot_path)
            except Exception as e:
                print(f"Error in plotting for {dim}, {bead_count} beads: {e}")

        print(f"Completed processing for {bead_count} beads in {time.time() - bead_start:.2f} sec")

main()



🔄 Starting processing for 1000 beads...
Reading replicate 1: D:\Project\LE\Beads\1000\replicate_1\_msd_bead_coord.txt
✅ Finished processing replicate 1 (30402 timepoints)
⏱️ Replicate 1 completed in 1457.08 sec
Reading replicate 2: D:\Project\LE\Beads\1000\replicate_2\_msd_bead_coord.txt
✅ Finished processing replicate 2 (27371 timepoints)
⏱️ Replicate 2 completed in 965.27 sec
Reading replicate 3: D:\Project\LE\Beads\1000\replicate_3\_msd_bead_coord.txt
✅ Finished processing replicate 3 (26563 timepoints)
⏱️ Replicate 3 completed in 973.27 sec
Reading replicate 4: D:\Project\LE\Beads\1000\replicate_4\_msd_bead_coord.txt
✅ Finished processing replicate 4 (30402 timepoints)
⏱️ Replicate 4 completed in 1186.06 sec
Reading replicate 5: D:\Project\LE\Beads\1000\replicate_5\_msd_bead_coord.txt
✅ Finished processing replicate 5 (30402 timepoints)
⏱️ Replicate 5 completed in 1118.00 sec
Reading replicate 6: D:\Project\LE\Beads\1000\replicate_6\_msd_bead_coord.txt
✅ Finished processing replic

KeyboardInterrupt: 

In [9]:
# 🔁 Truncate 1000 bead replicate data to shortest common time length and reprocess

bead_count = 1000
dimensions = ['length', 'width', 'depth']

output_dir = r"D:\Project\LE_beads"
base_dir = r"D:\Project\LE\Beads"  # Make sure this matches your actual path

print(f"\n🔧 Reprocessing {bead_count} beads using truncated timepoints...")

replicate_paths = [
    os.path.join(base_dir, str(bead_count), f'replicate_{i}', '_msd_bead_coord.txt')
    for i in range(1, 11)
]

dim_accum = {dim: [] for dim in dimensions}
lengths = []
time_ref = None  # To get truncated time axis

for rep_idx, file_path in enumerate(replicate_paths, 1):
    if not os.path.isfile(file_path):
        print(f"❌ Missing file: {file_path}")
        continue

    df = process_replicate(file_path, rep_idx)
    if df.empty:
        print(f"⚠️ Skipping empty replicate {rep_idx}")
        continue

    lengths.append(len(df['time']))

    if time_ref is None or len(df['time']) < len(time_ref):
        time_ref = df['time'].values  # Keep shortest

    for dim in dimensions:
        dim_accum[dim].append(df[dim].values)

# Truncate to minimum time length
min_length = min(lengths)
print(f"✂️ Truncating all replicate data to {min_length} timepoints...")

truncated_data = {
    dim: [rep[:min_length] for rep in dim_accum[dim]]
    for dim in dimensions
}
time_array = time_ref[:min_length]

# Fit and plot again
for dim in dimensions:
    print(f"📊 Fitting and plotting: {dim} dimension for {bead_count} beads")
    try:
        dim_matrix = np.array(truncated_data[dim])
        mean_values = np.mean(dim_matrix, axis=0)
        x0, xeq, tau = fit_decay(time_array, mean_values, bead_count, dim)

        plot_path = os.path.join(output_dir, f'{bead_count}_{dim}_fit_truncated.png')
        plot_fitted_curve(time_array, mean_values, xeq, tau, bead_count, dim, plot_path)
        print(f"✅ Saved plot: {plot_path}")
    except Exception as e:
        print(f"❌ Error in fitting/plotting for {dim}, {bead_count} beads: {e}")



🔧 Reprocessing 1000 beads using truncated timepoints...
Reading replicate 1: D:\Project\LE\Beads\1000\replicate_1\_msd_bead_coord.txt
✅ Finished processing replicate 1 (30402 timepoints)
Reading replicate 2: D:\Project\LE\Beads\1000\replicate_2\_msd_bead_coord.txt
✅ Finished processing replicate 2 (27371 timepoints)
Reading replicate 3: D:\Project\LE\Beads\1000\replicate_3\_msd_bead_coord.txt
✅ Finished processing replicate 3 (26563 timepoints)
Reading replicate 4: D:\Project\LE\Beads\1000\replicate_4\_msd_bead_coord.txt
✅ Finished processing replicate 4 (30402 timepoints)
Reading replicate 5: D:\Project\LE\Beads\1000\replicate_5\_msd_bead_coord.txt
✅ Finished processing replicate 5 (30402 timepoints)
Reading replicate 6: D:\Project\LE\Beads\1000\replicate_6\_msd_bead_coord.txt
✅ Finished processing replicate 6 (21614 timepoints)
Reading replicate 7: D:\Project\LE\Beads\1000\replicate_7\_msd_bead_coord.txt
✅ Finished processing replicate 7 (30402 timepoints)
Reading replicate 8: D:\Pr