# Loading of Packages
Here you load all necessary packages to run your code.
As you can see it imports functions from the Segmentation_function.py file.

I would recomend to paste these package imports as well as the ones from the Segmentation_function.py into chatgpt and ask which packages to install via pip install for your conda environment to get the code running.

In [26]:
import os
import numpy as np
import pandas as pd
import anndata
from tifffile import imread, imwrite
from cellpose import models
import matplotlib.pyplot as plt
from bigfish.detection import get_object_radius_pixel
from skimage.measure import label, regionprops
from Segmentation_function import (
    load_nd_metadata, extract_tiff_metadata, generate_mips,
    run_cellpose_segmentation, extract_nuclei_properties, build_anndata,
    segment_cytoplasm_by_cellpose, detect_smfish_spots,
    generate_segmentation_report, process_sample, analyze_sample
)

import warnings
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=anndata._core.aligned_df.ImplicitModificationWarning)

## Definition of Functions 

FYI here I define some function to run over several folders instead of the single pieces. 
I guess in the future this will be hidden in the segmentaiton_function.py file and only the run command below will be visible.
But for now this can be usefull to tweack the parameters like the SNR ratio of 3.0, the cropped area or other things like channel name etc...

In [27]:
def process_all_samples(folder_path: str, crop_coords=(800, 1200, 800, 1200), snr_threshold: float = 3.0):
    """
    Process all samples in a folder that contain .nd and .stk files.

    Parameters:
        folder_path (str): Root directory containing sample files.
        crop_coords (tuple): Coordinates for cropping spot region in segmentation report.
        snr_threshold (float): SNR threshold for filtering smFISH spots.
    """
    files = os.listdir(folder_path)
    sample_names = set()

    for file in files:
        if file.endswith(".nd"):
            base = file.replace(".nd", "")
            stk1 = f"{base}_w1conf561Virtex.stk"
            stk2 = f"{base}_w2conf405Virtex.stk"
            if stk1 in files and stk2 in files:
                sample_names.add(base)

    if not sample_names:
        print("⚠️ No complete samples (.nd + .stk files) found.")
        return

    print(f"🔎 Found {len(sample_names)} samples to process: {sorted(sample_names)}")

    for sample_name in sorted(sample_names):
        try:
            process_sample(folder_path, sample_name, crop_coords=crop_coords, snr_threshold=snr_threshold)
        except Exception as e:
            print(f"❌ Failed to process {sample_name}: {e}")


In [28]:
def process_all_timepoints(root_folder: str, crop_coords=(800, 1200, 800, 1200), snr_threshold: float = 3.0):
    """
    Process all timepoint subfolders inside a root directory.

    Parameters:
        root_folder (str): Path to root folder (e.g., /path/to/Eglantine).
        crop_coords (tuple): Crop coordinates for segmentation report.
        snr_threshold (float): SNR threshold for smFISH filtering.
    """
    # List valid subfolders
    subfolders = sorted([
        f for f in os.listdir(root_folder)
        if os.path.isdir(os.path.join(root_folder, f)) and not f.startswith("._")
    ])

    if not subfolders:
        print("⚠️ No valid timepoint subfolders found.")
        return

    print(f"🕒 Found {len(subfolders)} timepoints: {subfolders}\n")

    # Loop through each timepoint folder
    for tp in subfolders:
        tp_path = os.path.join(root_folder, tp)
        print("=" * 70)
        print(f"🧪 Processing timepoint folder: {tp}")
        print("=" * 70)
        try:
            process_all_samples(tp_path, crop_coords=crop_coords, snr_threshold=snr_threshold)
        except Exception as e:
            print(f"❌ Failed to process timepoint '{tp}': {e}")
        print()  # Line break for clarity between timepoints

In [29]:
# ===== Wrapper: run all timepoints in all replicates of one condition =====

def _list_clean_subfolders(parent, exclude={"analysis_output", "Analysis", "results"}):
    return sorted([
        d for d in os.listdir(parent)
        if os.path.isdir(os.path.join(parent, d))
        and not d.startswith(("._", "."))
        and d not in exclude
    ])

def process_condition(condition_folder: str, crop_coords=(800, 1200, 800, 1200), snr_threshold: float = 3.0):
    """
    Run the full segmentation pipeline for every replicate and timepoint inside a condition.

    Directory layout expected:
        condition_folder/
          replicate 1/
            0h/, 1h/, ..., Nh/
          replicate 2/
            0h/, ..., Nh/
          ...

    This calls your existing `process_all_timepoints()` for each replicate.
    """
    if not os.path.isdir(condition_folder):
        print(f"❌ Condition path not found: {condition_folder}")
        return

    replicates = _list_clean_subfolders(condition_folder)
    if not replicates:
        print(f"⚠️ No replicate folders found in {condition_folder}")
        return

    print(f"🧪 Condition: {os.path.basename(condition_folder)} — Found {len(replicates)} replicates: {replicates}\n")
    for rep in replicates:
        rep_path = os.path.join(condition_folder, rep)
        print("#" * 80) 
        print(f"🔁 Processing replicate: {rep}")
        print("#" * 80)
        try:
            process_all_timepoints(rep_path, crop_coords=crop_coords, snr_threshold=snr_threshold)
        except Exception as e:
            print(f"❌ Failed replicate '{rep}': {e}")
        print()  # spacing


def analyze_condition(condition_folder: str, snr_threshold: float = 3.0):
    """
    Run the analysis stage for every replicate (and all timepoints within) of a condition.

    This calls your existing `analyze_all_timepoints()` for each replicate.
    """
    if not os.path.isdir(condition_folder):
        print(f"❌ Condition path not found: {condition_folder}")
        return

    replicates = _list_clean_subfolders(condition_folder)
    if not replicates:
        print(f"⚠️ No replicate folders found in {condition_folder}")
        return

    print(f"📊 Analyzing condition: {os.path.basename(condition_folder)} — {len(replicates)} replicates: {replicates}\n")
    for rep in replicates:
        rep_path = os.path.join(condition_folder, rep)
        print("#" * 80)
        print(f"📦 Analyzing replicate: {rep}")
        print("#" * 80)
        try:
            analyze_all_timepoints(rep_path, snr_threshold=snr_threshold)
        except Exception as e:
            print(f"❌ Failed analysis for replicate '{rep}': {e}")
        print()  # spacing


## Main Run Function 
This function will run the segmentation and spot detection in a folder that has subfolders as timepoitns (1h,2h .... 8h) and will process every timepoint + every sample within.
So you have to add your new folder path to the root_folder variable (please always copy your data and dont perform on raw data !

In [None]:
"""
#This is my Code for a single folder (1 timepoitn like the 2h only)
if __name__ == "__main__":
    folder = "/Volumes/Project_PhD/6_Coding/Eglante/2h"
    process_all_samples(folder)


# This is the code for multiple timepoints here you give the path to your folder wiht all the timepoint folders inside the smapels everthing else goes automatic
if __name__ == "__main__":
    root_folder =  root_folder# Folder containing 0h, 1h, ..., 10h
    process_all_timepoints(root_folder)
"""

## Analysis Run Function definition

This code defines the  analysis on all segmented images e.g. is the dot in or outside the nuclei etc. Later I should hide this into the main Run code but for now its like this. 

In [37]:
def analyze_all_samples(folder_path: str, snr_threshold: float = 3.0):
    """
    Analyze all .h5ad samples in the given folder.

    Parameters:
        folder_path (str): Path containing .h5ad files.
        snr_threshold (float): Minimum SNR threshold for filtering spots.
    """
    h5ad_files = [
        os.path.join(folder_path, f) for f in os.listdir(folder_path)
        if f.endswith("_nuclei_spots.h5ad") and not f.startswith("._")
    ]

    if not h5ad_files:
        print("⚠️ No .h5ad files found for analysis.")
        return

    print(f"🔍 Found {len(h5ad_files)} samples for analysis.")

    for path in sorted(h5ad_files):
        sample_name = os.path.basename(path).replace("_nuclei_spots.h5ad", "")
        print("=" * 60)
        print(f"📦 Analyzing Sample: {sample_name}")
        print("=" * 60)
        try:
            analyze_sample(path, snr_threshold=snr_threshold)
        except Exception as e:
            print(f"❌ Failed to analyze {sample_name}: {e}")
        print()  # Empty line between samples

In [38]:
def analyze_all_timepoints(root_folder: str, snr_threshold: float = 3.0):
    """
    Analyze all timepoint subfolders inside a root directory, each containing .h5ad files.

    Parameters:
        root_folder (str): Path to root folder (e.g., /path/to/Eglantine).
        snr_threshold (float): Minimum signal-to-background ratio for filtering spots.
    """
    # Only include folders that look like timepoints; exclude analysis folders and dotfiles
    subfolders = sorted([
        f for f in os.listdir(root_folder)
        if os.path.isdir(os.path.join(root_folder, f))
        and not f.startswith(("._", "."))
        and f not in {"analysis_output", "Analysis", "results"}
    ])

    if not subfolders:
        print("⚠️ No valid timepoint subfolders found.")
        return

    print(f"🕒 Found {len(subfolders)} timepoints to analyze: {subfolders}\n")

    for tp in subfolders:
        tp_output_path = os.path.join(root_folder, tp, "analysis_output")
        print("=" * 70)
        print(f"🧪 Analyzing timepoint: {tp}")
        print("=" * 70)
        if not os.path.isdir(tp_output_path):
            print(f"⚠️ Skipping '{tp}': no analysis_output at {tp_output_path}\n")
            continue
        try:
            analyze_all_samples(tp_output_path, snr_threshold=snr_threshold)
        except Exception as e:
            print(f"❌ Failed to analyze timepoint '{tp}': {e}")
        print()  # Newline for spacing


## Analysis Run code

Again here replace the folder path to the one before where you run the previous code. It will than analyse each .h5ad file in the timepoints and sample folders

In [None]:
root_folder = 'path/to/your/data/folder'

In [None]:
"""
# This is my Code for a single folder (1 timepoitn like the 2h only)
if __name__ == "__main__":
    root_folder = "/Volumes/Project_PhD/6_Coding/Eglante/2h"  # contains 0h, 1h, ..., 10h
    analyze_all_samples(root_folder, snr_threshold=3)


# This is the code for multiple timepoints here you give the path to your folder wiht all the timepoint folders inside the smapels everthing else goes automatic
if __name__ == "__main__":
    root_folder = root_folder
    analyze_all_timepoints(root_folder, snr_threshold=3)

    
"""
# ===== Example usage =====
if __name__ == "__main__":
    # Set one condition folder at a time and run:
    condition_folder = root_folder   # e.g., ".../Suntag/condition1"
    process_condition(condition_folder, crop_coords=(800, 1200, 800, 1200), snr_threshold=3.0)

    # When segmentation finished (h5ad + outputs exist), run the analysis:
    analyze_condition(condition_folder, snr_threshold=3.0)

## Merging of Analysis data

Due to the way I coded it, all samples have seperate reports with seperate .csv and .h5ad files. This code runs over everything and combines all .csv into one experiment .csv => makes it easy to handle and plot later.

Maybe change here the output_name in the paramters otherwise its the experiment_merged name

In [41]:
import os
import anndata
import pandas as pd

def merge_experiment(root_folder: str, output_name: str = "experiment_merged"):
    """
    Merge all .h5ad and .csv files from every sample in all timepoint subfolders
    into a single experiment-level .h5ad and .csv file.

    Parameters:
        root_folder (str): Root directory containing timepoint folders.
        output_name (str): Prefix for the output files (no extension).
    """
    merged_adata_list = []
    merged_summary_stats = []
    merged_cellwise_spots = []

    subfolders = sorted([
        f for f in os.listdir(root_folder)
        if os.path.isdir(os.path.join(root_folder, f)) and not f.startswith("._")
    ])

    print(f"🔍 Scanning {len(subfolders)} timepoints...")

    for tp in subfolders:
        tp_output = os.path.join(root_folder, tp, "analysis_output")
        if not os.path.exists(tp_output):
            continue

        files = os.listdir(tp_output)
        h5ad_files = [f for f in files if f.endswith("_nuclei_spots.h5ad") and not f.startswith("._")]

        for h5ad_file in h5ad_files:
            sample_base = h5ad_file.replace("_nuclei_spots.h5ad", "")
            h5ad_path = os.path.join(tp_output, h5ad_file)
            stats_csv = os.path.join(tp_output, f"{sample_base}_summary_statistics.csv")
            spot_csv = os.path.join(tp_output, f"{sample_base}_cellwise_spot_counts.csv")

            try:
                adata = anndata.read_h5ad(h5ad_path)
                adata.obs["timepoint"] = tp
                merged_adata_list.append(adata)

                if os.path.exists(stats_csv):
                    df_stats = pd.read_csv(stats_csv)
                    df_stats["timepoint"] = tp
                    df_stats["sample_name"] = sample_base
                    merged_summary_stats.append(df_stats)

                if os.path.exists(spot_csv):
                    df_spots = pd.read_csv(spot_csv)
                    df_spots["timepoint"] = tp
                    df_spots["sample_name"] = sample_base
                    merged_cellwise_spots.append(df_spots)

            except Exception as e:
                print(f"❌ Failed to load {sample_base} in {tp}: {e}")

    # Merge AnnData
    if not merged_adata_list:
        print("⚠️ No .h5ad files successfully loaded.")
        return

    print("🔗 Merging AnnData objects...")
    merged_adata = anndata.concat(merged_adata_list, axis=0, join="outer", merge="unique")

    # Merge CSVs
    summary_df = pd.concat(merged_summary_stats, ignore_index=True) if merged_summary_stats else pd.DataFrame()
    spots_df = pd.concat(merged_cellwise_spots, ignore_index=True) if merged_cellwise_spots else pd.DataFrame()

    # Save results
    output_dir = os.path.join(root_folder, "experiment_output_corrected")
    os.makedirs(output_dir, exist_ok=True)

    adata_path = os.path.join(output_dir, f"{output_name}.h5ad")
    summary_csv = os.path.join(output_dir, f"{output_name}_summary_statistics_SNR2.csv")
    spots_csv = os.path.join(output_dir, f"{output_name}_cellwise_spot_counts_SNR2.csv")

    merged_adata.write(adata_path)
    summary_df.to_csv(summary_csv, index=False)
    spots_df.to_csv(spots_csv, index=False)

    print(f"\n✅ Experiment-level files saved:")
    print(f"📁 Merged AnnData: {adata_path}")
    print(f"📊 Summary CSV:    {summary_csv}")
    print(f"📊 Cellwise CSV:   {spots_csv}")


In [None]:
# ---- Run it ----
root_folder = 'path/to/single/replicate'

if __name__ == "__main__":
    root = root_folder  # root folder containing timepoint folders like "0h", "1h", etc.
    merge_experiment(root, output_name="experiment_eglante")