In [1]:
#Beating rate, 90% or 80% contraction amplitude, dp/dt, TTP, time to 90% relaxtion, tau
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks, butter, filtfilt,firwin
from scipy.integrate import simps
import os
from scipy.optimize import curve_fit
from glob import glob

In [2]:
def compute_signal_gradient(signal, time_vector, display=False, label='Signal', save_path=None):
    """
    Compute the first derivative (gradient) of a signal over time.
    
    Parameters:
        signal (array): The signal to differentiate.
        time_vector (array): Time points corresponding to the signal.
        display (bool): If True, plot the signal and its gradient.
        label (str): Label for the signal in the plot (optional).
        
    Returns:
        gradient (array): The gradient of the signal.
    """
    gradient = np.gradient(signal, time_vector)

    if display or save_path:
        fig, axs = plt.subplots(2, 1, figsize=(12, 8), sharex=True)

        axs[0].plot(time_vector, signal, label=label, color='green')
        axs[0].set_ylabel('Signal Amplitude')
        axs[0].set_title('Original Signal')
        axs[0].legend()
        axs[0].grid(True)

        axs[1].plot(time_vector, gradient, label='Gradient', color='purple')
        axs[1].set_xlabel('Time (s)')
        axs[1].set_ylabel('d(signal)/dt')
        axs[1].set_title('Signal Gradient (1st Derivative)')
        axs[1].legend()
        axs[1].grid(True)

        plt.tight_layout()
        if save_path:
            plt.savefig(save_path)
            plt.close()
        else:
            plt.show()

    return gradient

In [3]:
def detect_contraction_rise_points(signal, time_vector, gradient_prominence=2, min_peak_distance=20, display=False, save_path=None):
    """
    Detect contraction rise points by finding gradient peaks and preceding zero-crossings (end of relaxation).

    Parameters:
        signal (array): Filtered signal.
        time_vector (array): Corresponding time in seconds.
        gradient_prominence (float): Prominence threshold for gradient peaks.
        min_peak_distance (int): Minimum distance between peaks (in samples).
        display (bool): If True, plot the signal with peaks and end-of-relaxation points.
        save_path (str): If provided, saves the plot to the specified path.

    Returns:
        zero_crossings (array): Indices of zero-crossings before each gradient peak.
    """

    # Step 1: Compute gradient
    gradient = np.gradient(signal, time_vector)

    # Step 2: Detect peaks in gradient
    grad_peaks, _ = find_peaks(gradient, prominence=gradient_prominence, distance=min_peak_distance)

    # Step 3: Find zero-crossing (gradient <= 0) before each peak
    zero_crossings = []
    for peak_idx in grad_peaks:
        for i in range(peak_idx - 1, 0, -1):
            if gradient[i] <= 0:
                zero_crossings.append(i)
                break

    # Step 4: Plot if requested
    if display or save_path:
        plt.figure(figsize=(12, 6))
        plt.plot(time_vector, signal, label='Filtered Signal', color='green')
        plt.plot(time_vector[peaks], signal[peaks], 'ro', label='Gradient Peaks (Max Rise)')
        plt.plot(time_vector[zero_crossings], signal[zero_crossings], 'bo', markersize=8, label='End of Relaxation (∇ ≤ 0)')
        plt.title('Filtered Signal with Contraction Start and End of Relaxation')
        plt.xlabel('Time (s)')
        plt.ylabel('Contraction Stress (mN / mm²)')
        plt.legend()
        plt.grid(True)
        plt.tight_layout()

        if save_path:
            print(f"Saving relaxation plot to: {save_path}")
            plt.savefig(save_path)
            plt.close()
        else:
            plt.show()

    return zero_crossings

In [4]:
def calculate_beating_rate(peak_indices, time_vector, display=True):
    """
    Calculate beating rate (BPM) and frequency (Hz) from peak indices.

    Parameters:
        peak_indices (array): Indices of detected peaks.
        time_vector (array): Time in seconds corresponding to signal samples.
        display (bool): If True, print BPM and frequency.

    Returns:
        bpm (float): Beating rate in beats per minute.
        freq_hz (float): Beating frequency in Hz.
        time_intervals (array): Time between beats (seconds).
    """
    if len(peak_indices) < 2:
        if display:
            print("Not enough peaks to calculate beating rate.")
        return None, None, []

    # Get time values at peaks
    peak_times = time_vector[peak_indices]

    # Calculate intervals and statistics
    time_intervals = np.diff(peak_times)
    avg_interval = np.mean(time_intervals)
    bpm = int(60 / avg_interval)
    freq_hz = 1 / avg_interval

    if display:
        print(f"Beating Rate: {bpm} BPM")
        print(f"Calculated Frequency: {freq_hz:.2f} Hz")

    return bpm, freq_hz, time_intervals


In [5]:
def compute_relative_contraction_amplitudes(signal, peaks, end_relax_idxs, threshold_percentage=0.9, display=True):
    """
    Compute per-cycle relative contraction amplitudes (e.g. 90% of peak rise) from end-of-relaxation to peak.

    Ensures that the first relax index is before the first peak,
    and the last peak is before the last relax index.

    Parameters:
        signal (array): The filtered contraction signal.
        peaks (array): Indices of contraction peaks.
        end_relax_idxs (array): Indices of relaxation end points.
        threshold_percentage (float): Fraction of full amplitude to extract (default 0.9).
        display (bool): If True, print amplitudes and summary.

    Returns:
        cycle_amplitudes (list): Full peak-to-start amplitudes.
        cycle_amplitudes_90 (list): 90% amplitude values per cycle.
        mean_90 (float): Mean 90% amplitude across all valid cycles.
    """
    # Ensure arrays
    peaks = np.array(peaks)
    end_relax_idxs = np.array(end_relax_idxs)

    # Align: first relax < first peak, last peak < last relax
    if end_relax_idxs[0] > peaks[0]:
        peaks = peaks[1:]
    if peaks[-1] > end_relax_idxs[-1]:
        peaks = peaks[:-1]

    # Trim to equal length
    min_len = min(len(peaks), len(end_relax_idxs))
    peaks = peaks[:min_len]
    end_relax_idxs = end_relax_idxs[:min_len]

    cycle_amplitudes = []
    cycle_amplitudes_90 = []

    for i, (peak, relax) in enumerate(zip(peaks, end_relax_idxs)):
        if relax < peak:
            amp = signal[peak] - signal[relax]
            cycle_amplitudes.append(amp)
            cycle_amplitudes_90.append(amp * threshold_percentage)
            if display:
                print(f"Cycle {i+1}: Full Amplitude = {amp:.3f}, {threshold_percentage*100:.0f}% = {amp * threshold_percentage:.3f}")
        elif display:
            print(f"Cycle {i+1}: Skipped (relax {relax} not before peak {peak})")

    mean_90 = np.mean(cycle_amplitudes_90) if cycle_amplitudes_90 else None

    if display:
        print(f"\nMean {threshold_percentage*100:.0f}% contraction amplitude: {mean_90:.3f}" if mean_90 else "No valid contraction cycles found.")

    return cycle_amplitudes, cycle_amplitudes_90, mean_90


In [6]:
def compute_time_to_peak(time_vector, peaks, end_relax_idxs, display=True):
    """
    Compute time-to-peak (TTP) from end-of-relaxation to the next peak for each contraction cycle.

    Ensures first relax comes before first peak, and last peak before last relax.

    Parameters:
        time_vector (array): Time values in seconds.
        peaks (array): Indices of contraction peaks.
        end_relax_idxs (array): Indices of end-of-relaxation points.
        display (bool): If True, print TTP values.

    Returns:
        time_to_peaks (list): Time to peak per cycle (s).
        mean_ttp (float): Mean TTP across valid cycles.
    """
    # Align and trim indices
    peaks = np.array(peaks)
    end_relax_idxs = np.array(end_relax_idxs)

    if end_relax_idxs[0] > peaks[0]:
        peaks = peaks[1:]
    if peaks[-1] > end_relax_idxs[-1]:
        peaks = peaks[:-1]

    min_len = min(len(peaks), len(end_relax_idxs))
    peaks = peaks[:min_len]
    end_relax_idxs = end_relax_idxs[:min_len]

    time_to_peaks = []

    for i, (peak_idx, relax_idx) in enumerate(zip(peaks, end_relax_idxs)):
        if relax_idx < peak_idx:
            ttp = time_vector[peak_idx] - time_vector[relax_idx]
            time_to_peaks.append(ttp)
            if display:
                print(f"Cycle {i+1}: Time to Peak = {ttp:.3f} s")
        elif display:
            print(f"Cycle {i+1}: Skipped (relax idx {relax_idx} not before peak idx {peak_idx})")

    mean_ttp = np.mean(time_to_peaks) if time_to_peaks else None

    if display:
        print(f"\nMean Time to Peak: {mean_ttp:.3f} s" if mean_ttp is not None else "No valid cycles found.")

    return time_to_peaks, mean_ttp


In [7]:
def compute_peak_to_90_relaxation_time(signal, time_vector, peaks, end_relax_idxs, threshold_percentage=0.9, display=True):
    """
    Compute the time from each peak to 90% relaxation level (within each contraction cycle).

    Parameters:
        signal (array): The filtered signal.
        time_vector (array): Time in seconds.
        peaks (array): Indices of contraction peaks.
        end_relax_idxs (array): Indices of end-of-relaxation points.
        threshold_percentage (float): Percentage of full relaxation (default 0.9 = 90% relaxation).
        display (bool): If True, print results.

    Returns:
        t90r_times (list): Time from peak to 90% relaxation for each valid cycle.
        mean_t90r (float): Mean time to 90% relaxation.
    """
    peaks = np.array(peaks)
    end_relax_idxs = np.array(end_relax_idxs)

    # Step 1: Align indices
    if end_relax_idxs[0] < peaks[0]:
        end_relax_idxs = end_relax_idxs[1:]
    if peaks[-1] > end_relax_idxs[-1]:
        peaks = peaks[:-1]

    min_len = min(len(peaks), len(end_relax_idxs))
    peaks = peaks[:min_len]
    end_relax_idxs = end_relax_idxs[:min_len]

    t90r_times = []

    for i, (peak_idx, relax_idx) in enumerate(zip(peaks, end_relax_idxs)):
        if relax_idx > peak_idx:
            peak_val = signal[peak_idx]
            relax_val = signal[relax_idx]

            threshold = relax_val + (peak_val - relax_val) * (1 - threshold_percentage)

            # Search between peak and relax to find first crossing below threshold
            for j in range(peak_idx, relax_idx + 1):
                if signal[j] <= threshold:
                    t90r = time_vector[j] - time_vector[peak_idx]
                    t90r_times.append(t90r)
                    if display:
                        print(f"Cycle {i+1}: T90R = {t90r:.3f} s (threshold = {threshold:.3f})")
                    break
        elif display:
            print(f"Cycle {i+1}: Skipped (relax before peak)")

    mean_t90r = np.mean(t90r_times) if t90r_times else None

    if display:
        print(f"\nMean Time to {threshold_percentage*100:.0f}% Relaxation: {mean_t90r:.3f} s" if mean_t90r else "No valid cycles found.")

    return t90r_times, mean_t90r


In [8]:

def exp_decay(t, P0, Pasym, tau):
    return (P0 - Pasym) * np.exp(-t / tau) + Pasym

def compute_relaxation_time_constants(signal, time_vector, peaks, end_relax_idxs, display_graphs=False, display_print=False):
    """
    Compute exponential decay time constant (τ) for each contraction cycle (from peak to end of relaxation).

    Parameters:
        signal (array): Filtered contraction signal.
        time_vector (array): Time vector (same length as signal).
        peaks (array): Indices of contraction peaks.
        end_relax_idxs (array): Indices of end-of-relaxation points.
        display (bool): If True, plot fits and print τ per cycle.

    Returns:
        taus (list): Tau values per valid contraction.
        mean_tau (float): Mean tau across valid cycles.
    """
    # Ensure numpy arrays
    peaks = np.array(peaks)
    end_relax_idxs = np.array(end_relax_idxs)
    time_vector = np.array(time_vector)
    signal = np.array(signal)

    # Step 1: Align indices so that peak < relax for each pair
    if end_relax_idxs[0] < peaks[0]:
        end_relax_idxs = end_relax_idxs[1:]
    if peaks[-1] > end_relax_idxs[-1]:
        peaks = peaks[:-1]

    # Step 2: Trim to equal length
    min_len = min(len(peaks), len(end_relax_idxs))
    peaks = peaks[:min_len]
    end_relax_idxs = end_relax_idxs[:min_len]

    taus = []

    for i, (peak_idx, relax_idx) in enumerate(zip(peaks, end_relax_idxs)):
        if relax_idx <= peak_idx:
            if display:
                print(f"Cycle {i+1}: Skipped (relax {relax_idx} not after peak {peak_idx})")
            continue

        # Segment: from peak to relax
        t_segment = time_vector[peak_idx:relax_idx]
        s_segment = signal[peak_idx:relax_idx]

        # Check segment length
        if len(t_segment) < 3:
            if display:
                print(f"Cycle {i+1}: Skipped (segment too short)")
            continue

        # Time shifted to start at 0
        t_decay = t_segment - t_segment[0]
        p_decay = s_segment

        # Initial parameter guess
        P0_guess = p_decay[0]
        Pasym_guess = p_decay[-1]
        tau_guess = (t_decay[-1] - t_decay[0]) / 2

        try:
            popt, _ = curve_fit(exp_decay, t_decay, p_decay,
                                p0=[P0_guess, Pasym_guess, tau_guess],
                                maxfev=10000)
            _, _, tau_fit = popt
            taus.append(tau_fit)

            if display_graphs:
                fit_curve = exp_decay(t_decay, *popt)
                plt.figure(figsize=(8, 4))
                plt.plot(t_segment, p_decay, 'b.', label='Data')
                plt.plot(t_segment, fit_curve, 'r--', label=f'Fit (τ = {tau_fit:.2f} s)')
                plt.title(f'Exponential Decay Fit — Cycle {i+1}')
                plt.xlabel('Time (s)')
                plt.ylabel('Contraction Stress (mN/mm²)')
                plt.legend()
                plt.grid(True)
                plt.tight_layout()
                plt.show()

            if display_print:
                print(f"Cycle {i+1}: τ = {tau_fit:.3f} s")

        except RuntimeError:
            if display_print:
                print(f"Cycle {i+1}: Fit failed.")

    mean_tau = np.mean(taus) if taus else None
    if display_print:
        print(f"\nMean τ across cycles: {mean_tau:.3f} s" if mean_tau is not None else "No valid tau values.")

    return taus, mean_tau

In [9]:
def create_summary_excel_with_means(signal, time_vector, peaks, end_relax_idxs, output_filename="contraction_summary_with_means.xlsx"):
    bpm, freq_hz, _ = calculate_beating_rate(peaks, time_vector, display=False)
    amps, amps_90, mean_amp_90 = compute_relative_contraction_amplitudes(signal, peaks, end_relax_idxs, display=False)
    ttp, mean_ttp = compute_time_to_peak(time_vector, peaks, end_relax_idxs, display=False)
    t90r, mean_t90r = compute_peak_to_90_relaxation_time(signal, time_vector, peaks, end_relax_idxs, display=False)
    taus, mean_tau = compute_relaxation_time_constants(signal, time_vector, peaks, end_relax_idxs,display_graphs=False,
    display_print = False)

    max_len = max(len(amps_90), len(ttp), len(t90r), len(taus))
    def pad(lst): return lst + [np.nan] * (max_len - len(lst))

    data = {
        "90% Amplitude (mN/mm²)": pad(amps_90),
        "Time to Peak (s)": pad(ttp),
        "Time to 90% Relaxation (s)": pad(t90r),
        "Tau (s)": pad(taus),
    }

    df = pd.DataFrame(data)
    df.loc["Mean"] = df.mean(numeric_only=True)

    meta = pd.DataFrame({
        "Parameter": [
            "Beating Rate (BPM)",
            "Frequency (Hz)",
            "Mean 90% Amplitude (mN/mm²)",
            "Mean Time to Peak (s)",
            "Mean Time to 90% Relaxation (s)",
            "Mean Tau (s)"
        ],
        "Value": [
            bpm,
            freq_hz,
            mean_amp_90,
            mean_ttp,
            mean_t90r,
            mean_tau
        ]
    })

    with pd.ExcelWriter(output_filename) as writer:
        meta.to_excel(writer, sheet_name="Summary", index=False)
        df.to_excel(writer, sheet_name="Per Cycle", index_label="Cycle")
    print(f'Excel with the parametes is prepered')
    return df, meta


In [10]:
# file_name = "stress_1HZ_1_flood_fill.csv"
# df =  pd.read_csv(file_name)
# contraction_stress, frame_time_point = df['stress_kpa'] , df['frame']

In [19]:
# === Input & Output Paths ===
input_folder = r"C:\Users\CaspiLab\Desktop\smarthreart - Dor's codes\4DCELL DATA\1HZ"
output_folder = r"C:\Users\CaspiLab\Desktop\smarthreart - Dor's codes\18 TIFFS for comperison\4DCell params results"
os.makedirs(output_folder, exist_ok=True)

# === Loop Through Files ===
excel_files = glob(os.path.join(input_folder, "*.csv"))

for file_path in excel_files:
    try:
        file_name = os.path.basename(file_path)
        print(f"Processing: {file_name}")
        
        # === Load Data ===
        df = pd.read_csv(file_path)
        filtered_full = df['contraction_stress (mPa*mm^2)']
        frame_time_point_sec = df['frame_time_point (s)']

        # === Find Peaks ===
        peaks, _ = find_peaks(filtered_full, prominence=0.1, distance=20)

        # === Gradient & End Relax Points (save plots) ===
        base_name = os.path.splitext(file_name.split(" -")[0])[0]
        gradient_plot_path = os.path.join(output_folder, f"4Dcell {base_name}_gradient_plot.png")
        relaxation_plot_path = os.path.join(output_folder, f"4Dcell {base_name}_peaks_relax_plot.png")

        # Save Gradient Plot
        gradient_full = compute_signal_gradient(
            filtered_full,
            frame_time_point_sec,
            display=False,
            label='Full MA Filtered',
            save_path=gradient_plot_path
        )

        # Save Relaxation Plot
        end_relax_idxs = detect_contraction_rise_points(
            signal=filtered_full,
            time_vector=frame_time_point_sec,
            gradient_prominence=2,
            min_peak_distance=20,
            display=False,
            save_path=relaxation_plot_path
        )

        # === Parameter Calculations ===
        bpm, freq_hz, intervals = calculate_beating_rate(peaks, frame_time_point_sec, display=False)
        ttp_list, mean_ttp = compute_time_to_peak(frame_time_point_sec, peaks, end_relax_idxs, display=False)
        t90r_list, mean_t90r = compute_peak_to_90_relaxation_time(
            signal=filtered_full,
            time_vector=frame_time_point_sec,
            peaks=peaks,
            end_relax_idxs=end_relax_idxs,
            threshold_percentage=0.9,
            display=False
        )
        taus, mean_tau = compute_relaxation_time_constants(
            signal=filtered_full,
            time_vector=frame_time_point_sec,
            peaks=peaks,
            end_relax_idxs=end_relax_idxs,
            display_graphs=False,
            display_print=False
        )

        # === Save Summary Excel ===
        output_filename = os.path.join(output_folder, f"4Dcell parametres of {base_name}.xlsx")
        df_cycles, df_summary = create_summary_excel_with_means(
            signal=filtered_full,
            time_vector=frame_time_point_sec,
            peaks=peaks,
            end_relax_idxs=end_relax_idxs,
            output_filename=output_filename
        )

        print(f"Saved: {output_filename}")

    except Exception as e:
        print(f"Error processing {file_name}: {e}")

Processing: SH_1Hz_A4_001current001.avi - נוצר באמצעות Clipchamp_analyzed_prediction_features.csv
Saving relaxation plot to: C:\Users\CaspiLab\Desktop\smarthreart - Dor's codes\18 TIFFS for comperison\4DCell params results\4Dcell SH_1Hz_A4_001current001_peaks_relax_plot.png
Excel with the parametes is prepered
Saved: C:\Users\CaspiLab\Desktop\smarthreart - Dor's codes\18 TIFFS for comperison\4DCell params results\4Dcell parametres of SH_1Hz_A4_001current001.xlsx
Processing: SH_1Hz_A4_002current001.avi - נוצר באמצעות Clipchamp_analyzed_prediction_features.csv
Saving relaxation plot to: C:\Users\CaspiLab\Desktop\smarthreart - Dor's codes\18 TIFFS for comperison\4DCell params results\4Dcell SH_1Hz_A4_002current001_peaks_relax_plot.png
Excel with the parametes is prepered
Saved: C:\Users\CaspiLab\Desktop\smarthreart - Dor's codes\18 TIFFS for comperison\4DCell params results\4Dcell parametres of SH_1Hz_A4_002current001.xlsx
Processing: SH_1Hz_A4_003current001.avi - נוצר באמצעות Clipchamp_a