# TXPipe Tutorial

## 1. Introduction

**TXPipe** is the DESC pipeline to perform *3×2pt* measurements, and it has been extended to support cluster cosmology analyses.  
It is built on top of **Ceci**, which organizes pipelines as sequences of smaller tasks called *stages*.  

Each stage is defined as a `PipelineStage`, a Python class with:
- **Inputs** and **outputs** (declared up front).
- A **configuration dictionary** with options.
- A **`run` method** where the actual computation happens.

To run pipelines, we use two YAML configuration files:
- **Pipeline config (`pipeline_config.yml`)**: defines which stages to run, input files, output directory, parallelization, etc.
- **Stage config (`stage_config.yml`)**: defines the parameters for each stage.

Let’s look at them step by step.


In [1]:
import sys
from pprint import pprint
import os
import ceci
txpipe_dir = "/global/homes/e/edujb/TXPipe"
sys.path.append(txpipe_dir)
import txpipe
import h5py  
import matplotlib.pyplot as plt  
from pprint import pprint
import yaml

  from pkg_resources import DistributionNotFound


## 2. Pipeline Configuration

In [2]:
config_yaml = yaml.safe_load(open("pipeline_config.yml"))
pprint(config_yaml)


{'config': './stage_config.yml',
 'inputs': {'calibration_table': '/global/cfs/projectdirs/lsst/groups/CL/cl_pipeline_project/TXPipe_data/cosmodc2/20deg2/sample_cosmodc2_w10year_errors.dat',
            'cluster_catalog': '/global/cfs/projectdirs/lsst/groups/CL/cl_pipeline_project/TXPipe_data/cosmodc2/20deg2/cluster_catalog.hdf5',
            'fiducial_cosmology': '/global/cfs/projectdirs/lsst/groups/CL/cl_pipeline_project/TXPipe_data/cosmodc2/fiducial_cosmology.yml',
            'shear_catalog': '/global/cfs/projectdirs/lsst/groups/CL/cl_pipeline_project/TXPipe_data/cosmodc2/20deg2/shear_catalog.hdf5',
            'source_photoz_pdfs': '/global/cfs/projectdirs/lsst/groups/CL/cl_pipeline_project/TXPipe_data/cosmodc2/20deg2/photoz_pdfs.hdf5',
            'spectroscopic_catalog': '/global/cfs/projectdirs/lsst/groups/CL/cl_pipeline_project/TXPipe_data/cosmodc2/20deg2/spectroscopic_catalog.hdf5'},
 'launcher': {'interval': 0.5, 'name': 'mini'},
 'log_dir': './logs',
 'modules': 'txpipe rai

This file contains **general pipeline settings**:
- Where outputs are written.
- Number of tasks for parallelization.
- Which stages to run (e.g. `TXSourceSelectorMetadetect`, `BPZliteEstimator`, etc.).

If one stage produces a file and another needs it, TXPipe automatically connects them.
Let us take a look to the configuration file.

## 3. Stage Configuration

In the stage configuration file, we have the individual configurations for each of the codes we are running.

In [3]:
config_yaml = yaml.safe_load(open("stage_config.yml"))
pprint(config_yaml)


{'BPZliteInformer': {'bands': 'riz',
                     'columns_file': '/global/cfs/projectdirs/lsst/groups/CL/cl_pipeline_project/TXPipe/data/bpz_riz.columns',
                     'data_path': '/global/cfs/projectdirs/lsst/groups/CL/cl_pipeline_project/TXPipe_data/example/rail-bpz-inputs',
                     'gauss_kernel': 0.0,
                     'hdf5_groupname': 'photometry',
                     'inform_options': {'load_model': False,
                                        'modelfile': 'BPZpriormodel.out',
                                        'save_train': False},
                     'madau_reddening': False,
                     'mag_err_min': 0.005,
                     'nzbins': 301,
                     'p_min': 0.005,
                     'prior_band': 'i',
                     'prior_file': 'hdfn_gen',
                     'ref_band': 'i',
                     'spectra_file': 'CWWSB4.list',
                     'zmax': 3.0,
                     'zmin': 0.0,
    

## 4. Running Pipelines

We can run pipelines in different ways:

- **Terminal (recommended for real runs)**  
  ```bash
  ceci pipeline_config.yml


- **Notebook (for dry runs / exploration)**
Let’s try it here:

In [4]:
pipeline = ceci.Pipeline.read("pipeline_config.yml")
pprint(pipeline)

pipeline.run()


SES - yaml config file :  None
<ceci.pipeline.MiniPipeline object at 0x7fa6c8178460>

Executing TXSourceSelectorMetadetect
Command is:
srun -u -n 1 --cpus-per-task=1 --nodes 1 shifter --env OMP_NUM_THREADS=1  --image ghcr.io/lsstdesc/txpipe-dev  python3 -m txpipe TXSourceSelectorMetadetect   --shear_catalog=/global/cfs/projectdirs/lsst/groups/CL/cl_pipeline_project/TXPipe_data/cosmodc2/20deg2/shear_catalog.hdf5   --spectroscopic_catalog=/global/cfs/projectdirs/lsst/groups/CL/cl_pipeline_project/TXPipe_data/cosmodc2/20deg2/spectroscopic_catalog.hdf5   --config=./stage_config.yml   --shear_tomography_catalog=/global/cfs/projectdirs/lsst/groups/CL/cl_pipeline_project/TXPipe_data/cosmodc2/outputs-20deg2-CL//shear_tomography_catalog.hdf5   
Output writing to ./logs/TXSourceSelectorMetadetect.out

Job TXSourceSelectorMetadetect has failed with status 1



*************************************************
Error running pipeline stage TXSourceSelectorMetadetect.
Failed after 0.5 seconds.

Standard output and error streams in ./logs/TXSourceSelectorMetadetect.out
*************************************************


1

⚠️ Sometimes notebook runs fail (especially in parallel stages).  
Best practice: run with `srun` or directly in the terminal.  
In notebooks, it’s often easier to run **single stages** interactively.


Instead of the full pipeline, we can configure and run a single stage.  
Here’s an example with `CLClusterBinningRedshiftRichness`.


## 5. Running a Stage Manually

In [5]:
outdir = "/global/homes/e/edujb/CLPipeline/tutorials/txpipe_tutorial/example_output"  # Changed path to indicate full sim
cluster_catalog_nersc="/global/cfs/projectdirs/lsst/groups/CL/cl_pipeline_project/TXPipe_data/cosmodc2/20deg2/cluster_catalog.hdf5"

# Create output directory
os.makedirs(outdir, exist_ok=True)

# Configure for full simulation
stage = txpipe.CLClusterBinningRedshiftRichness.make_stage(
    zedge=[0.2, 0.4, 0.6, 0.8],
    richedge= [5., 10., 20., 25., 50.],
    cluster_catalog = cluster_catalog_nersc,
    cluster_catalog_tomography = f"{outdir}/cluster_catalog_tomography.hdf5",
)

# # run the stage
stage.run()

# # move the stage outputs to their final locations
stage.finalize()

## 6. CLClusterEnsembleProfiles Example

Now let’s focus on a concrete stage to better understand how TXPipe stages work.  
We will analyze **`CLClusterEnsembleProfiles`**, a stage responsible for taking the individual shear profiles of each cluster and combining them into a stacked profile using **CLMM**.

If we look at the source code ([link](https://github.com/LSSTDESC/TXPipe/blob/master/txpipe/extensions/cluster_counts/make_ensemble_profile.py)), we find the following:


```
class CLClusterEnsembleProfiles(CLClusterShearCatalogs):
    name = "CLClusterEnsembleProfiles"
    inputs = [
        #("cluster_catalog", HDFFile),
        ("cluster_catalog_tomography", HDFFile), # TO TEST
        ("fiducial_cosmology", FiducialCosmology),
        ("cluster_shear_catalogs", HDFFile),
    ]

    outputs = [
        ("cluster_profiles",  PickleFile),
    ]

    config_options = {
        #radial bin definition
        "r_min" : 0.2, #in Mpc
        "r_max" : 3.0, #in Mpc
        "nbins" : 5, # number of bins
        #type of profile
        "delta_sigma_profile" : True,
        "shear_profile" : False,
        "magnification_profile" : False,
        #coordinate_system for shear
        #"coordinate_system" : 'euclidean' #Must be either 'celestial' or 'euclidean'
    }

    def run(self):
        import sklearn.neighbors
        import astropy
        import h5py
        import clmm
        import clmm.cosmology.ccl
```

From this definition we can observe:

- **Inputs**: three explicit inputs (`cluster_catalog_tomography`, `fiducial_cosmology`, `cluster_shear_catalogs`) plus others inherited from the parent class.  
- **Output**: one file, `cluster_profiles`, written as a pickle.  
- **Configuration options**: defaults for binning, profile type, etc. These can later be overridden in the stage configuration file.  
- **Run function**: the core computation, with imports defined *inside* the function so they are only loaded when needed.

---

Looking a bit deeper, inside the `run` method we see:

```
        with self.open_input("fiducial_cosmology", wrapper=True) as f:
            ccl_cosmo = f.to_ccl()
            self.clmm_cosmo = clmm.cosmology.ccl.CCLCosmology()
            self.clmm_cosmo.set_be_cosmo(ccl_cosmo)
            
        
                 
        # load cluster catalog as an astropy table
        #clusters = self.load_cluster_catalog()
        
        if self.config["delta_sigma_profile"]==True:
```
This highlights some important features:

- Stages use **built-in methods** (inherited from Ceci’s `PipelineStage`) to open inputs, outputs, and configurations.  
- For example, `self.open_input("fiducial_cosmology", wrapper=True)` automatically loads the file and converts it into a usable object.  
- Configurations are easily accessible through `self.config`. In this case, the choice of whether to compute a ΔΣ profile depends on the boolean flag `delta_sigma_profile`.



## 7. Useful Stage Methods

All stages inherit utility methods from Ceci’s `PipelineStage`.  For a full list of utility methods available to all stages, see the [Ceci `PipelineStage` class](https://github.com/LSSTDESC/ceci/blob/master/ceci/stage.py).  

Some useful examples include:  

- `stage.get_output("cluster_catalog_tomography")` → get the path of a specific output.  
- `stage.print_io()` → print all inputs and outputs for the stage.  
- `stage.find_outputs(outdir)` → locate all generated outputs in a directory. 


In [6]:
shear_cat = stage.get_output("cluster_catalog_tomography")   # Get a specific output
stage.print_io()                                # Print input/output info
stage.find_outputs(outdir)                      # Locate output files


Inputs--------
cluster_catalog      : cluster_catalog      :<class 'txpipe.data_types.base.HDFFile'> : /global/cfs/projectdirs/lsst/groups/CL/cl_pipeline_project/TXPipe_data/cosmodc2/20deg2/cluster_catalog.hdf5
Outputs--------
cluster_catalog_tomography : cluster_catalog_tomography :<class 'txpipe.data_types.base.HDFFile'> : /global/homes/e/edujb/CLPipeline/tutorials/txpipe_tutorial/example_output/cluster_catalog_tomography.hdf5


{'cluster_catalog_tomography': '/global/homes/e/edujb/CLPipeline/tutorials/txpipe_tutorial/example_output//global/homes/e/edujb/CLPipeline/tutorials/txpipe_tutorial/example_output/cluster_catalog_tomography.hdf5'}

## 8. Creating a Custom Stage: `WorkshopStage`
Let’s build a toy stage that:
1. Reads a pickle input file.
2. Multiplies a value by a configurable number.
3. Saves the result as another pickle.

This helps us understand how to structure stages.


In [9]:
from txpipe import PipelineStage
from txpipe.data_types import PickleFile
import pickle as pkl
import sys

# Workaround for Jupyter: give __main__ a filename
sys.modules['__main__'].__file__ = "notebook_stage.py"

class WorkshopStage(PipelineStage):
    name = "simple_stage"
    allow_reload = True   # allow redefining in notebooks

    inputs = [
        ("simple_file", PickleFile),
    ]

    outputs = [
        ("processed_file", PickleFile),
    ]

    config_options = {
        "print_statement": False,
        "number": 1.0,
    }

    def run(self):
        infile = self.get_input("simple_file")
        with open(infile, "rb") as f:
            data = pkl.load(f)

        number = self.config["number"]
        data_out = {"original": data, "scaled_by": number}

        outfile = self.get_output("processed_file")
        with open(outfile, "wb") as f:    
            pkl.dump(data_out, f)

        if self.config["print_statement"]:
            print(f"Wrote processed file to {outfile}")


In [10]:
# Run the stage
stage_workshop = WorkshopStage.make_stage(
    processed_file=f"{outdir}/processed_file.pkl",
    simple_file="simple_file.pkl",
    number=2.0,
    print_statement=True,
)

stage_workshop.run()
## We have to run finalize to close all outputs and check if they were generated
stage_workshop.finalize()

# Inspect the result
with open(f"{outdir}/processed_file.pkl", "rb") as f:
    print(pkl.load(f))


Wrote processed file to /global/homes/e/edujb/CLPipeline/tutorials/txpipe_tutorial/example_output/inprogress_processed_file.pkl
{'original': {'galaxy_count': 42, 'positions': [(1.2, 3.4), (2.1, 4.3), (3.2, 1.5)], 'metadata': {'survey': 'demo', 'version': 1.0}}, 'scaled_by': 2.0}


We have seen:
- How TXPipe organizes work into stages.
- How pipeline and stage configuration files connect everything.
- How to run pipelines (full or partial).
- How to explore an existing stage (`CLClusterEnsembleProfiles`).
- How to build and run a custom stage (`WorkshopStage`).

Next steps:
- Try editing config YAMLs to see how stages change behavior.
- Explore other stages in the [TXPipe repo](https://github.com/LSSTDESC/TXPipe).
- Experiment with more complex inputs/outputs in your own `PipelineStage`.
