# Build, Save, Load, and Run a Pipeline

**Author:** Eric Charles

**Last Run Successfully:** December 22, 2023

This notebook shows how to:

1. Build a simple interactive rail pipeline,

2. Save that pipeline (including configuration) to a yaml file,

3. Load that pipeline from the saved yaml file,

4. Run the loaded pipeline.  

In [1]:
import os
import numpy as np
import ceci
import rail
import tables_io
from rail.core.stage import RailStage
from rail.creation.degraders.spectroscopic_degraders import LineConfusion
from rail.creation.degraders.quantityCut import QuantityCut
from rail.creation.degraders.photometric_errors import LSSTErrorModel
from rail.creation.engines.flowEngine import FlowCreator, FlowPosterior
from rail.tools.flow_handle import FlowHandle
from rail.core.data import TableHandle
from rail.core.stage import RailStage
from rail.tools.table_tools import ColumnMapper, TableConverter

## Build the pipeline

### Some configuration setup

The example pipeline builds some of the RAIL creation functionality into a pipeline. 

Here we are defining:

1. The location of the pretrained PZFlow file used with this example.

2. The bands we will be generating data for.

3. The names of the columns where we will be writing the error estimates.

4. The grid of redshifts we use for posterior estimation.

In [2]:
from rail.utils.path_utils import find_rail_file

flow_file = find_rail_file("examples_data/goldenspike_data/data/pretrained_flow.pkl")
bands = ["u", "g", "r", "i", "z", "y"]
band_dict = {band: f"mag_{band}_lsst" for band in bands}
rename_dict = {f"mag_{band}_lsst_err": f"mag_err_{band}_lsst" for band in bands}
post_grid = [float(x) for x in np.linspace(0.0, 5, 21)]

### Define the pipeline stages

The RailStage base class defines the `make_stage` "classmethod" function, which allows us to make a stage of
that particular type in a general way.

Note that that we are passing in the configuration parameters to each pipeline stage as keyword arguments.

The names of the parameters will depend on the stage type.

A couple of things are important:

1. Each stage should have a unique name. In `ceci`, stage names default to the name of the class (e.g., FlowCreator, or LSSTErrorModel); this would be problematic if you wanted two stages of the same type in a given pipeline, so be sure to assign each stage its own name.

2. At this point, we aren't actually worrying about the connections between the stages.

In [6]:
# create a data handle of the file 
flow_handle = FlowHandle("model", path=flow_file)

In [7]:
flow_engine_test = FlowCreator.make_stage(
    name="flow_engine_test", model=flow_handle, n_samples=50
)

lsst_error_model_test = LSSTErrorModel.make_stage(
    name="lsst_error_model_test", renameDict=band_dict
)

col_remapper_test = ColumnMapper.make_stage(
    name="col_remapper_test", hdf5_groupname="", columns=rename_dict
)

flow_post_test = FlowPosterior.make_stage(
    name="flow_post_test", column="redshift", flow=flow_file, grid=post_grid
)

table_conv_test = TableConverter.make_stage(
    name="table_conv_test", output_format="numpyDict", seed=12345
)

An NVIDIA GPU may be present on this machine, but a CUDA-enabled jaxlib is not installed. Falling back to cpu.


Inserting handle into data store.  model: <class 'rail.tools.flow_handle.FlowHandle'> /home/jscora/code/desc-rail/rail_base/src/rail/examples_data/goldenspike_data/data/pretrained_flow.pkl, (w), flow_engine_test


In [8]:
flow_engine_test.sample(6, seed=0).data

Inserting handle into data store.  output_flow_engine_test: inprogress_output_flow_engine_test.pq, flow_engine_test


Unnamed: 0,mag_z_lsst,mag_y_lsst,mag_r_lsst,mag_i_lsst,redshift,mag_u_lsst,mag_g_lsst
0,22.734173,22.580509,24.02784,22.996883,0.739296,25.822563,25.088484
1,22.66666,22.415169,23.99596,23.025354,0.644894,27.391764,25.44791
2,21.013422,20.779903,21.757725,21.298742,0.348834,23.999668,22.884043
3,24.050123,23.81822,24.27183,24.152014,1.623727,24.338676,24.272142
4,23.140982,23.09551,23.577923,23.19063,0.551647,25.13657,24.586559
5,21.022926,20.927565,22.416426,21.435768,0.839687,23.225256,23.068544


### Make the pipeline and add the stages

Here we make an empty interactive pipeline (interactive in the sense that it will be run locally, rather than using the batch submission mechanisms built into `ceci`), and add the stages to that pipeline.

In [9]:
pipe = ceci.Pipeline.interactive()
stages = [flow_engine_test, lsst_error_model_test, col_remapper_test, table_conv_test]
for stage in stages:
    pipe.add_stage(stage)

### Interactive introspection

Here are some examples of interactive introspection into the pipeline

I.e., some functions that you can use to figure out what the pipeline is doing.

In [10]:
# Get the names of the stages
pipe.stage_names

['flow_engine_test',
 'lsst_error_model_test',
 'col_remapper_test',
 'table_conv_test']

In [11]:
# Get the configuration of a particular stage
pipe.flow_engine_test.config

StageConfig{output_mode:default,n_samples:6,seed:0,name:flow_engine_test,model:/home/jscora/code/desc-rail/rail_base/src/rail/examples_data/goldenspike_data/data/pretrained_flow.pkl,config:None,}

In [12]:
# Get the list of outputs 'tags'
# These are how the stage thinks of the outputs, as a list names associated to DataHandle types.
pipe.flow_engine_test.outputs

[('output', rail.core.data.PqHandle)]

In [13]:
# Get the list of outputs 'aliased tags'
# These are how the pipeline things of the outputs, as a unique key that points to a particular file
pipe.flow_engine_test._outputs

{'output_flow_engine_test': 'output_flow_engine_test.pq'}

### Connect up the pipeline stages

We can use the `RailStage.connect_input` function to connect one stage to another.
By default, this will connect the output data product called `output` for one stage.

In [14]:
lsst_error_model_test.connect_input(flow_engine_test)
col_remapper_test.connect_input(lsst_error_model_test)
# flow_post_test.connect_input(col_remapper_test, inputTag='input')
table_conv_test.connect_input(col_remapper_test)

Inserting handle into data store.  output_flow_engine_test: None, lsst_error_model_test
Inserting handle into data store.  output_lsst_error_model_test: inprogress_output_lsst_error_model_test.pq, lsst_error_model_test
Inserting handle into data store.  output_lsst_error_model_test: None, col_remapper_test
Inserting handle into data store.  output_col_remapper_test: inprogress_output_col_remapper_test.pq, col_remapper_test
Inserting handle into data store.  output_col_remapper_test: None, table_conv_test


### Initialize the pipeline

This will do a few things:

1. Attach any global pipeline inputs that were not specified in the connections above. In our case, the input flow file is pre-existing and must be specified as a global input.

2. Specifiy output and logging directories.

3. Optionally, create the pipeline in 'resume' mode, where it will ignore stages if all of their output already exists.

In [15]:
pipe.initialize(
    dict(model=flow_file), dict(output_dir=".", log_dir=".", resume=False), None
)

(({'flow_engine_test': <Job flow_engine_test>,
   'lsst_error_model_test': <Job lsst_error_model_test>,
   'col_remapper_test': <Job col_remapper_test>,
   'table_conv_test': <Job table_conv_test>},
  [<rail.creation.engines.flowEngine.FlowCreator at 0x707a84a92780>,
   <rail.creation.degraders.photometric_errors.LSSTErrorModel at 0x707a8011f650>,
   Stage that applies remaps the following column names in a pandas DataFrame:
   f{str(self.config.columns)},
   <rail.tools.table_tools.TableConverter at 0x707a80294c80>]),
 {'output_dir': '.', 'log_dir': '.', 'resume': False})

## Save the pipeline

This will actually write two files (as this is what `ceci` wants)

1. `pipe_example.yml`, which will have a list of stages, with instructions on how to execute the stages (e.g., run this stage in parallel on 20 processors). For an interactive pipeline, those instructions will be trivial.

2. `pipe_example_config.yml`, which will have a dictionary of configurations for each stage.

In [16]:
pipe.save("pipe_saved.yml")

## Read the saved pipeline

In [17]:
pr = ceci.Pipeline.read("pipe_saved.yml")

Inserting handle into data store.  model: /home/jscora/code/desc-rail/rail_base/src/rail/examples_data/goldenspike_data/data/pretrained_flow.pkl, flow_engine_test


## Run the newly read pipeline

This will actually launch a Unix process to individually run each stage of the pipeline; you can see the commands that are being executed in each case.

In [18]:
pr.run()


Executing flow_engine_test
Command is:
OMP_NUM_THREADS=1   python3 -m ceci rail.creation.engines.flowEngine.FlowCreator   --model=/home/jscora/code/desc-rail/rail_base/src/rail/examples_data/goldenspike_data/data/pretrained_flow.pkl   --name=flow_engine_test   --config=pipe_saved_config.yml   --output=./output_flow_engine_test.pq 
Output writing to ./flow_engine_test.out

Job flow_engine_test has completed successfully in 6.0 seconds seconds !

Executing lsst_error_model_test
Command is:
OMP_NUM_THREADS=1   python3 -m ceci rail.creation.degraders.photometric_errors.LSSTErrorModel   --input=./output_flow_engine_test.pq   --name=lsst_error_model_test   --config=pipe_saved_config.yml   --output=./output_lsst_error_model_test.pq 
Output writing to ./lsst_error_model_test.out

Job lsst_error_model_test has completed successfully in 3.0 seconds seconds !

Executing col_remapper_test
Command is:
OMP_NUM_THREADS=1   python3 -m ceci rail.tools.table_tools.ColumnMapper   --input=./output_lsst_e

0

## Running saved pipelines from the command line
Once you've saved a pipeline and have the `pipeline_name.yml` and `pipeline_name_config.yml` file pair, you can go ahead and run the pipeline from the command line instead, if you prefer. With [ceci](https://github.com/LSSTDESC/ceci) installed in your environment, just run `ceci path/to/the/pipeline.yml`. Running the pipeline we've just made would look like:

In [None]:
! ceci pipe_saved.yml