In [1]:
"""Execute PCP generic pipeline."""

'Execute PCP generic pipeline.'

In [2]:
from pathlib import Path

from pipecraft.backend.snakemake import SnakeMakeBackend, SnakeMakeConfig

from starrynight.experiments.common import AcquisitionOrderType, ImageFrameType
from starrynight.experiments.pcp_generic import PCPGeneric, PCPGenericInitConfig
from starrynight.modules.analysis.analysis_cp import AnalysisInvokeCPModule
from starrynight.modules.analysis.analysis_cppipe import AnalysisGenCPPipeModule
from starrynight.modules.analysis.analysis_load_data import (
    AnalysisGenLoadDataModule,
)
from starrynight.modules.cp_illum_apply.apply_cp import (
    CPApplyIllumInvokeCPModule,
)
from starrynight.modules.cp_illum_apply.apply_cppipe import (
    CPApplyIllumGenCPPipeModule,
)
from starrynight.modules.cp_illum_apply.apply_load_data import (
    CPApplyIllumGenLoadDataModule,
)

# cp_illum_calc
from starrynight.modules.cp_illum_calc.calc_cp import CPCalcIllumInvokeCPModule
from starrynight.modules.cp_illum_calc.calc_cppipe import (
    CPCalcIllumGenCPPipeModule,
)
from starrynight.modules.cp_illum_calc.calc_load_data import (
    CPCalcIllumGenLoadDataModule,
)

# cp_segcheck
from starrynight.modules.cp_segcheck.segcheck_cp import CPSegcheckInvokeCPModule
from starrynight.modules.cp_segcheck.segcheck_cppipe import (
    CPSegcheckGenCPPipeModule,
)
from starrynight.modules.cp_segcheck.segcheck_load_data import (
    CPSegcheckGenLoadDataModule,
)

# inventory and index
from starrynight.modules.gen_index import GenIndexModule
from starrynight.modules.gen_inv import GenInvModule
from starrynight.modules.sbs_illum_apply.apply_cp import (
    SBSApplyIllumInvokeCPModule,
)
from starrynight.modules.sbs_illum_apply.apply_cppipe import (
    SBSApplyIllumGenCPPipeModule,
)
from starrynight.modules.sbs_illum_apply.apply_load_data import (
    SBSApplyIllumGenLoadDataModule,
)
from starrynight.modules.sbs_illum_calc.calc_cp import (
    SBSCalcIllumInvokeCPModule,
)
from starrynight.modules.sbs_illum_calc.calc_cppipe import (
    SBSCalcIllumGenCPPipeModule,
)
from starrynight.modules.sbs_illum_calc.calc_load_data import (
    SBSCalcIllumGenLoadDataModule,
)
from starrynight.modules.sbs_preprocess.preprocess_cp import (
    SBSPreprocessInvokeCPModule,
)  # noqa: E501
from starrynight.modules.sbs_preprocess.preprocess_cppipe import (
    SBSPreprocessGenCPPipeModule,
)
from starrynight.modules.sbs_preprocess.preprocess_load_data import (
    SBSPreprocessGenLoadDataModule,
)
from starrynight.schema import DataConfig

Setup dataset paths

In [3]:
dataset_path = Path("../../scratch/starrynight_example_input")
workspace_path = Path("../../scratch/starrynight_example_output")
exec_runs = Path("../../scratch/starrynight_runs")
exec_mounts = Path("../../scratch/starrynight_mounts")

Create data config

In [4]:
data_config = DataConfig(
    dataset_path=dataset_path,
    storage_path=dataset_path,
    workspace_path=workspace_path,
)

Create execution engine config config

In [12]:
backend_config = SnakeMakeConfig(use_fluent_bit=False, print_exec=True, background=False)

Configure the generate inventory module
This module is special and doesn't require an experiment for configuration

In [14]:
gen_inv_mod = GenInvModule.from_config(data_config)
exec_backend = SnakeMakeBackend(
    gen_inv_mod.pipe, backend_config, exec_runs / "run001", exec_mounts
)
run = exec_backend.run()
run.wait()
run.print_log()

Assuming unrestricted shared filesystem usage.
host: GPFDA-11A
Building DAG of jobs...
Using shell: /nix/store/8vpg72ik2kgxfj05lc56hkqrdrfl8xi9-bash-5.2p37/bin/bash
Provided cores: 10
Rules claiming more threads will be scaled down.
Singularity containers: ignored
Job stats:
job                   count
------------------  -------
all                       1
generate_inventory        1
total                     2

Select jobs to execute...
Execute 1 jobs...

[Thu Apr 17 11:37:55 2025]
localrule generate_inventory:
    output: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/inventory/completed.txt
    jobid: 1
    reason: Forced execution
    resources: tmpdir=/tmp

starrynight inventory gen -d /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_input -o /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/inventory






Writing inventory: 100%|██████████| 1/1 [00:00<00:00, 23831.27it/s]A[A[A
Writing inventory: 100

['Assuming unrestricted shared filesystem usage.\n',
 'host: GPFDA-11A\n',
 'Building DAG of jobs...\n',
 'Using shell: '
 '/nix/store/8vpg72ik2kgxfj05lc56hkqrdrfl8xi9-bash-5.2p37/bin/bash\n',
 'Provided cores: 10\n',
 'Rules claiming more threads will be scaled down.\n',
 'Singularity containers: ignored\n',
 'Job stats:\n',
 'job                   count\n',
 '------------------  -------\n',
 'all                       1\n',
 'generate_inventory        1\n',
 'total                     2\n',
 '\n',
 'Select jobs to execute...\n',
 'Execute 1 jobs...\n',
 '\n',
 '[Thu Apr 17 11:35:14 2025]\n',
 'localrule generate_inventory:\n',
 '    output: '
 '/home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/inventory/completed.txt\n',
 '    jobid: 1\n',
 '    reason: Forced execution\n',
 '    resources: tmpdir=/tmp\n',
 '\n',
 'starrynight inventory gen -d '
 '/home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_input '
 '-o '
 '/home/ank/workspace/hub/

Touching output file /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/inventory/completed.txt.
[Thu Apr 17 11:37:59 2025]
Finished job 1.
1 of 2 steps (50%) done
Select jobs to execute...
Execute 1 jobs...

[Thu Apr 17 11:37:59 2025]
localrule all:
    input: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/inventory/completed.txt
    jobid: 0
    reason: Forced execution
    resources: tmpdir=/tmp

echo 'done' > completed.txt
[Thu Apr 17 11:37:59 2025]
Finished job 0.
2 of 2 steps (100%) done
Complete log: .snakemake/log/2025-04-17T113755.319376.snakemake.log


Configure the generate index module
This module is special and doesn't require an experiment for configuration

In [15]:
gen_ind_mod = GenIndexModule.from_config(data_config)
exec_backend = SnakeMakeBackend(
    gen_ind_mod.pipe, backend_config, exec_runs / "run002", exec_mounts
)
run = exec_backend.run()
run.wait()

Assuming unrestricted shared filesystem usage.
host: GPFDA-11A
Building DAG of jobs...
Using shell: /nix/store/8vpg72ik2kgxfj05lc56hkqrdrfl8xi9-bash-5.2p37/bin/bash
Provided cores: 10
Rules claiming more threads will be scaled down.
Singularity containers: ignored
Job stats:
job               count
--------------  -------
all                   1
generate_index        1
total                 2

Select jobs to execute...
Execute 1 jobs...

[Thu Apr 17 11:40:05 2025]
localrule generate_index:
    input: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/inventory/inventory.parquet
    output: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/index/index.parquet
    jobid: 1
    reason: Forced execution
    resources: tmpdir=/tmp

starrynight index gen -i /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/inventory/inventory.parquet -o /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/

Configure the experiment with the generated index

In [16]:
index_path = workspace_path / "index/index.parquet"
pcp_exp_init = PCPGenericInitConfig(
    barcode_csv_path=Path(),
    cp_acquisition_order=AcquisitionOrderType.SNAKE,
    cp_img_frame_type=ImageFrameType.ROUND,
    cp_img_overlap_pct=10,
    sbs_acquisition_order=AcquisitionOrderType.SNAKE,
    sbs_img_frame_type=ImageFrameType.ROUND,
    sbs_img_overlap_pct=10,
    cp_nuclei_channel="DAPI",
    cp_cell_channel="PhalloAF750",
    cp_mito_channel="ZO1AF488",
    sbs_nuclei_channel="DAPI",
    sbs_cell_channel="PhalloAF750",
    sbs_mito_channel="ZO1AF488",
)
pcp_experiment = PCPGeneric.from_index(index_path, pcp_exp_init.model_dump())

Configure the following modules with the experiment

------------------------------------------------------------------

Step 1: CP calculate illum correction

In [20]:
# Gen load data
cp_calc_illum_load_data_mod = CPCalcIllumGenLoadDataModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    cp_calc_illum_load_data_mod.pipe,
    backend_config,
    exec_runs / "run003",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# Gen cppipe file
cp_calc_illum_cppipe_mod = CPCalcIllumGenCPPipeModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    cp_calc_illum_cppipe_mod.pipe,
    backend_config,
    exec_runs / "run004",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# Invoke cppipe file
cp_calc_illum_invoke_mod = CPCalcIllumInvokeCPModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    cp_calc_illum_invoke_mod.pipe,
    backend_config,
    exec_runs / "run005",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# ------------------------------------------------------------------

Assuming unrestricted shared filesystem usage.
host: GPFDA-11A
Building DAG of jobs...
Using shell: /nix/store/8vpg72ik2kgxfj05lc56hkqrdrfl8xi9-bash-5.2p37/bin/bash
Provided cores: 10
Rules claiming more threads will be scaled down.
Singularity containers: ignored
Job stats:
job                           count
--------------------------  -------
all                               1
cp_calc_illum_gen_loaddata        1
total                             2

Select jobs to execute...
Execute 1 jobs...

[Thu Apr 17 11:56:24 2025]
localrule cp_calc_illum_gen_loaddata:
    input: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/index/index.parquet
    output: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/cellprofiler/loaddata/cp/illum_calc/completed.txt
    jobid: 1
    reason: Forced execution
    resources: tmpdir=/tmp

starrynight illum calc loaddata -i /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/inde

Step 2: CP apply illum correction

In [21]:
# Gen load data
cp_apply_illum_load_data_mod = CPApplyIllumGenLoadDataModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    cp_apply_illum_load_data_mod.pipe,
    backend_config,
    exec_runs / "run006",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# Gen cppipe file
cp_apply_illum_cppipe_mod = CPApplyIllumGenCPPipeModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    cp_apply_illum_cppipe_mod.pipe,
    backend_config,
    exec_runs / "run007",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# Invoke cppipe file
cp_apply_illum_invoke_mod = CPApplyIllumInvokeCPModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    cp_apply_illum_invoke_mod.pipe,
    backend_config,
    exec_runs / "run008",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# ------------------------------------------------------------------

Assuming unrestricted shared filesystem usage.
host: GPFDA-11A
Building DAG of jobs...
Using shell: /nix/store/8vpg72ik2kgxfj05lc56hkqrdrfl8xi9-bash-5.2p37/bin/bash
Provided cores: 10
Rules claiming more threads will be scaled down.
Singularity containers: ignored
Job stats:
job                            count
---------------------------  -------
all                                1
cp_apply_illum_gen_loaddata        1
total                              2

Select jobs to execute...
Execute 1 jobs...

[Thu Apr 17 11:59:52 2025]
localrule cp_apply_illum_gen_loaddata:
    input: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/index/index.parquet
    output: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/cellprofiler/loaddata/cp/illum_apply/completed.txt
    jobid: 1
    reason: Forced execution
    resources: tmpdir=/tmp

starrynight illum apply loaddata -i /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_out

Step 3: CP segcheck

In [22]:
# Gen load data
cp_segcheck_load_data_mod = CPSegcheckGenLoadDataModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    cp_segcheck_load_data_mod.pipe,
    backend_config,
    exec_runs / "run009",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# Gen cppipe file
cp_segcheck_cppipe_mod = CPSegcheckGenCPPipeModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    cp_segcheck_cppipe_mod.pipe,
    backend_config,
    exec_runs / "run010",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# Invoke cppipe file
cp_segcheck_invoke_mod = CPSegcheckInvokeCPModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    cp_segcheck_invoke_mod.pipe,
    backend_config,
    exec_runs / "run011",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# ------------------------------------------------------------------

Assuming unrestricted shared filesystem usage.
host: GPFDA-11A
Building DAG of jobs...
Using shell: /nix/store/8vpg72ik2kgxfj05lc56hkqrdrfl8xi9-bash-5.2p37/bin/bash
Provided cores: 10
Rules claiming more threads will be scaled down.
Singularity containers: ignored
Job stats:
job                        count
-----------------------  -------
all                            1
cp_segchek_gen_loaddata        1
total                          2

Select jobs to execute...
Execute 1 jobs...

[Thu Apr 17 12:05:38 2025]
localrule cp_segchek_gen_loaddata:
    input: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/index/index.parquet
    output: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/cellprofiler/loaddata/cp/segcheck/completed.txt
    jobid: 1
    reason: Forced execution
    resources: tmpdir=/tmp

starrynight segcheck loaddata -i /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/index/index.parquet -o /ho

Step 5: SBS calculate illum correction

In [23]:
# Gen load data
sbs_calc_illum_load_data_mod = SBSCalcIllumGenLoadDataModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    sbs_calc_illum_load_data_mod.pipe,
    backend_config,
    exec_runs / "run012",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# Gen cppipe file
sbs_calc_illum_cppipe_mod = SBSCalcIllumGenCPPipeModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    sbs_calc_illum_cppipe_mod.pipe,
    backend_config,
    exec_runs / "run013",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# Invoke cppipe file
sbs_calc_illum_invoke_mod = SBSCalcIllumInvokeCPModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    sbs_calc_illum_invoke_mod.pipe,
    backend_config,
    exec_runs / "run014",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# ------------------------------------------------------------------

Assuming unrestricted shared filesystem usage.
host: GPFDA-11A
Building DAG of jobs...
Using shell: /nix/store/8vpg72ik2kgxfj05lc56hkqrdrfl8xi9-bash-5.2p37/bin/bash
Provided cores: 10
Rules claiming more threads will be scaled down.
Singularity containers: ignored
Job stats:
job                            count
---------------------------  -------
all                                1
sbs_calc_illum_gen_loaddata        1
total                              2

Select jobs to execute...
Execute 1 jobs...

[Thu Apr 17 12:06:19 2025]
localrule sbs_calc_illum_gen_loaddata:
    input: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/index/index.parquet
    output: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/cellprofiler/loaddata/sbs/illum_calc/completed.txt
    jobid: 1
    reason: Forced execution
    resources: tmpdir=/tmp

starrynight illum calc loaddata -i /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_outp

Step 6: SBS apply illum correction

In [24]:
# Gen load data
sbs_apply_illum_load_data_mod = SBSApplyIllumGenLoadDataModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    sbs_apply_illum_load_data_mod.pipe,
    backend_config,
    exec_runs / "run015",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# Gen cppipe file
sbs_apply_illum_cppipe_mod = SBSApplyIllumGenCPPipeModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    sbs_apply_illum_cppipe_mod.pipe,
    backend_config,
    exec_runs / "run016",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# Invoke cppipe file
sbs_apply_illum_invoke_mod = SBSApplyIllumInvokeCPModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    sbs_apply_illum_invoke_mod.pipe,
    backend_config,
    exec_runs / "run017",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# ------------------------------------------------------------------

Assuming unrestricted shared filesystem usage.
host: GPFDA-11A
Building DAG of jobs...
Using shell: /nix/store/8vpg72ik2kgxfj05lc56hkqrdrfl8xi9-bash-5.2p37/bin/bash
Provided cores: 10
Rules claiming more threads will be scaled down.
Singularity containers: ignored
Job stats:
job                             count
----------------------------  -------
all                                 1
sbs_apply_illum_gen_loaddata        1
total                               2

Select jobs to execute...
Execute 1 jobs...

[Thu Apr 17 12:06:56 2025]
localrule sbs_apply_illum_gen_loaddata:
    input: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/index/index.parquet
    output: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/cellprofiler/loaddata/sbs/illum_apply/completed.txt
    jobid: 1
    reason: Forced execution
    resources: tmpdir=/tmp

starrynight illum apply loaddata -i /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_exam

pipeline_exception


[Thu Apr 17 12:07:23 2025]
Error in rule sbs_apply_illum_invoke_cp:
    jobid: 1
    input: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/cellprofiler/cppipe/sbs/illum_apply/align_sbs.cppipe, /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/cellprofiler/loaddata/sbs/illum_apply/completed.txt
    output: /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/illum/sbs/illum_apply/completed.txt
    shell:
        starrynight cp -p /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/cellprofiler/cppipe/sbs/illum_apply/align_sbs.cppipe -l /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/cellprofiler/loaddata/sbs/illum_apply -o /home/ank/workspace/hub/broad/starrynight/scratch/starrynight_example_output/illum/sbs/illum_apply --sbs
        (one of the commands exited with non-zero exit code; note that snakemake uses bash strict mode!)

Shutting down, this migh

Step 7: SBS preprocess

In [None]:
# Gen load data
sbs_preprocess_load_data_mod = SBSPreprocessGenLoadDataModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    sbs_preprocess_load_data_mod.pipe,
    backend_config,
    exec_runs / "run018",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# Gen cppipe file
sbs_preprocess_cppipe_mod = SBSPreprocessGenCPPipeModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    sbs_preprocess_cppipe_mod.pipe,
    backend_config,
    exec_runs / "run019",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# Invoke cppipe file
sbs_preprocess_invoke_mod = SBSPreprocessInvokeCPModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    sbs_preprocess_invoke_mod.pipe,
    backend_config,
    exec_runs / "run020",
    exec_mounts,
)
run = exec_backend.run()
run.wait()
# ------------------------------------------------------------------

Step 9: Analysis

In [None]:
# Gen load data
analysis_load_data_mod = AnalysisGenLoadDataModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    analysis_load_data_mod.pipe,
    backend_config,
    exec_runs / "run021",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# Gen cppipe file
analysis_cppipe_mod = AnalysisGenCPPipeModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    analysis_cppipe_mod.pipe,
    backend_config,
    exec_runs / "run022",
    exec_mounts,
)
run = exec_backend.run()
run.wait()

# Invoke cppipe file
analysis_invoke_mod = AnalysisInvokeCPModule.from_config(
    data_config, pcp_experiment
)

exec_backend = SnakeMakeBackend(
    analysis_invoke_mod.pipe,
    backend_config,
    exec_runs / "run023",
    exec_mounts,
)
run = exec_backend.run()
run.wait()