# RAIL Pipeline example notebook

author: Eric Charles<br>
last run successfully: April 26, 2023<br>

This notbook shows how to:
1. Build a simple rail pipeline interactive, 
2. Save that pipeline (including configuraiton) to a yaml file, 
3. Load that pipeline from the saved yaml file,
4. Run the loaded pipeline.

In [None]:
import os
import numpy as np
import ceci
import rail
from rail.core.stage import RailStage
from rail.creation.degradation import LSSTErrorModel, InvRedshiftIncompleteness, LineConfusion, QuantityCut
from rail.creation.engines.flowEngine import FlowCreator, FlowPosterior
from rail.core.data import TableHandle
from rail.core.stage import RailStage
from rail.core.utilStages import ColumnMapper, TableConverter

We'll start by setting up the Rail data store. RAIL uses [ceci](https://github.com/LSSTDESC/ceci), which is designed for pipelines rather than interactive notebooks; the data store will work around that and enable us to use data interactively.
When working interactively, we want to allow overwriting data in the Rail data store to avoid errors if we re-run cells. 
See the `rail/examples/goldenspike/goldenspike.ipynb` example notebook for more details on the Data Store.

In [None]:
DS = RailStage.data_store
DS.__class__.allow_overwrite = True

### Some configuration setup

The example pipeline builds some of the RAIL creation functionality into a pipeline. 

Here we are defining:
1. The location of the pretrained PZFlow file used with this example.
2. The bands we will be generating data for.
3. The names of the columns where we will be writing the error estimates.
4. The grid of redshifts we use for posterior estimation.

In [None]:
from rail.core.utils import RAILDIR
flow_file = os.path.join(RAILDIR, 'rail/examples_data/goldenspike_data/data/pretrained_flow.pkl')
bands = ['u','g','r','i','z','y']
band_dict = {band:f'mag_{band}_lsst' for band in bands}
rename_dict = {f'mag_{band}_lsst_err':f'mag_err_{band}_lsst' for band in bands}
post_grid = [float(x) for x in np.linspace(0., 5, 21)]

### Define the pipeline stages

The RailStage base class defines the `make_stage` "classmethod" function, which allows us to make a stage of
that particular type in a general way.

Note that that we are passing in the configuration parameters to each pipeline stage as keyword arguments.

The names of the parameters will depend on the stage type.

A couple of things are important:
1. Each stage should have a unique name. In `ceci`, stage names default to the name of the class (e.g., FlowCreator, or LSSTErrorModel); this would be problematic if you wanted two stages of the same type in a given pipeline, so be sure to assign each stage its own name.
2. At this point, we aren't actually worrying about the connections between the stages.

In [None]:
flow_engine_test = FlowCreator.make_stage(name='flow_engine_test', 
                                         model=flow_file, n_samples=50)
      
lsst_error_model_test = LSSTErrorModel.make_stage(name='lsst_error_model_test',
                                                  bandNames=band_dict)
                
col_remapper_test = ColumnMapper.make_stage(name='col_remapper_test', hdf5_groupname='',
                                            columns=rename_dict)

flow_post_test = FlowPosterior.make_stage(name='flow_post_test',
                                          column='redshift', flow=flow_file,
                                          grid=post_grid)

table_conv_test = TableConverter.make_stage(name='table_conv_test', output_format='numpyDict', 
                                            seed=12345)


### Make the pipeline and add the stages

Here we make an empty interactive pipeline (interactive in the sense that it will be run locally, rather than using the batch submission mechanisms built into `ceci`), and add the stages to that pipeline.

In [None]:
pipe = ceci.Pipeline.interactive()
stages = [flow_engine_test, lsst_error_model_test, col_remapper_test, table_conv_test]
for stage in stages:
    pipe.add_stage(stage)

### Here are some examples of interactive introspection into the pipeline

I.e., some functions that you can use to figure out what the pipeline is doing.

In [None]:
# Get the names of the stages
pipe.stage_names

In [None]:
# Get the configuration of a particular stage
pipe.flow_engine_test.config

In [None]:
# Get the list of outputs 'tags' 
# These are how the stage thinks of the outputs, as a list names associated to DataHandle types.
pipe.flow_engine_test.outputs

In [None]:
# Get the list of outputs 'aliased tags'
# These are how the pipeline things of the outputs, as a unique key that points to a particular file
pipe.flow_engine_test._outputs

### Okay, now let's connect up the pipeline stages

We can use the `RailStage.connect_input` function to connect one stage to another.
By default, this will connect the output data product called `output` for one stage.

In [None]:
lsst_error_model_test.connect_input(flow_engine_test)
col_remapper_test.connect_input(lsst_error_model_test)
#flow_post_test.connect_input(col_remapper_test, inputTag='input')
table_conv_test.connect_input(col_remapper_test)

### Initialize the pipeline

This will do a few things:
1. Attach any global pipeline inputs that were not specified in the connections above. In our case, the input flow file is pre-existing and must be specified as a global input.
2. Specifiy output and logging directories.
3. Optionally, create the pipeline in 'resume' mode, where it will ignore stages if all of their output already exists.

In [None]:
pipe.initialize(dict(model=flow_file), dict(output_dir='.', log_dir='.', resume=False), None)

### Save the pipeline

This will actually write two files (b/c this is what `ceci` wants)

1. pipe_example.yml, which will have a list of stages, with instructions on how to execute the stages (e.g., run this stage in parallel on 20 processors). For an interactive pipeline, those instructions will be trivial.
2. pipe_example_config.yml, which will have a dictionary of configurations for each stage.

In [None]:
pipe.save('pipe_saved.yml')

### Read the saved pipeline

In [None]:
pr = ceci.Pipeline.read('pipe_saved.yml')

### Run the newly read pipeline

This will actually launch Unix process to individually run each stage of the pipeline; you can see the commands that are being executed in each case.

In [None]:
pr.run()