This notebook is very similar to the [RWAnalyzer simple pipeline](RWAnalyzer%20simple%20pipeline.ipynb) notebook.
It sets up and runs a series of analysis stages that are dispatched either locally (using the `subprocess` Python module) or remotely onto a *Slurm*+*singularity* enabled computer cluster, to parallelize the computation for each stage.

The analysis consists of processing regions of interest from two SPT data files.
Each region of interest is spatially segmented, and then DV inference is performed in the resulting space bins.

The main difference with the previous notebook lies in the fact that three stages are defined instead:

* a *tessellate* stage performs the segmentation,
* an *infer* stage runs the inference,
* and in-between a *reload* stage is introduced to synchronize the multiple workspaces, aligning the state of the RWAnalyzer objects on the *.rwa* files generated or updated by the *tessellate* stage.

Just like the previous notebook, if this notebook is run at least until the `a.run()` cell, the corresponding *.ipynb* file is exported and run in other processes or worker nodes.
Again, this implies that the notebook should be saved (*Save and Checkpoint*) before you *Restart & Run All*, if it has been modified.

The first notebook cells show how to set up the pipeline. The pipeline is actually launched at the `a.run()` cell of code, where `a` is the main `RWAnalyzer` object.
The notebook lines after the first call to the `run` method are never dispatched. Any second or third call to `run` would run the same initial part of the notebook.

# A *tessellate-and-infer* pipeline to resolve diffusivity and effective potential in space

Let us first define the same analyzer as in the [RWAnalyzer simple pipeline](RWAnalyzer%20simple%20pipeline.ipynb):

In [1]:
import os

wd = '~/' + os.path.relpath(os.getcwd(), os.path.expanduser('~')).replace('\\', '/')

In [2]:
from tramway.analyzer import *

a                                 = RWAnalyzer()

a.spt_data.from_ascii_files(f'{wd}/data-examples/*.rpt.txt')
a.spt_data.localization_precision = 0.03

a.roi.from_ascii_files(suffix='roi') # => *.rpt-roi.txt

a.tesseller                       = tessellers.GWR
a.tesseller.resolution            = 0.05

a.mapper.from_plugin('stochastic.dv')
a.mapper.diffusivity_prior        = 20
a.mapper.potential_prior          = 1
a.mapper.max_runtime              = 100 # in seconds; 100 seconds is much too short for a proper DV estimation, but convenient for a quick example
a.mapper.verbose                  = False
a.mapper.worker_count             = None if os.name == 'nt' else 4 # Windows OS is not fully supported yet

Below are defined the different pipeline stages, using -- in the second code cell -- building blocks available in the `stages` module exported by the `tramway.analyzer` package.

Prior to the three main stages `tessellate`, `reload` and `infer`, just like in the previous notebook, we can also ensure that none of the generated *.rwa* files exists, introducing the following `fresh_start` stage:

In [3]:
def fresh_start(self):
    """
    Deletes the *.rwa* files associated with the SPT data files, if any.
    """
    for f in self.spt_data:
        rwa_file = os.path.splitext(f.source)[0] + '.rwa'
        try:
            os.unlink(rwa_file)
        except FileNotFoundError:
            pass
        else:
            f.logger.debug(f'file deleted: {rwa_file}')

a.pipeline.append_stage(fresh_start)

In [4]:
a.pipeline.append_stage(stages.tessellate())
a.pipeline.append_stage(stages.reload())
a.pipeline.append_stage(stages.infer())

The remainder of the notebook until the `a.run()` expression is similar:

In [5]:
#a.env                             = environments.Maestro # works only over Institut Pasteur's VPN or on campus
a.env                             = environments.LocalHost # replacement so that the demo can work anywhere
a.env.worker_count                = 10

a.env.script                      = 'RWAnalyzer multi-stage pipeline.ipynb'

The `run` method launches the pipeline.
The workload is concentrated in the following code cell:

In [6]:
a.run()

working directory: /tmp/tmparbvuuuf
setup complete
running: jupyter nbconvert --to python "/home/flaurent/github/TRamWAy/notebooks/RWAnalyzer multi-stage pipeline.ipynb" --stdout
initial dispatch done

jobs ready
jobs submitted
setup complete
stage 0 ready
stage 0 done
jobs complete
skipping empty file /tmp/tmparbvuuuf/tmp94i356cm.rwa
skipping empty file /tmp/tmparbvuuuf/tmpb8ycqp2t.rwa

jobs ready
jobs submitted
setup complete
stage 1 ready
tessellating roi: 'roi000' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-02-15ms.rpt.txt')...
tessellating roi: 'roi001' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-02-15ms.rpt.txt')...
tessellating roi: 'roi002' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-02-15ms.rpt.txt')...
tessellating roi: 'roi003' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-02-15ms.rpt.txt')...
tessellating r

setup complete
stage 1 ready
tessellating roi: 'roi000' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-01-15ms.rpt.txt')...
tessellating roi: 'roi001' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-01-15ms.rpt.txt')...
tessellating roi: 'roi002' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-01-15ms.rpt.txt')...
tessellating roi: 'roi003' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-01-15ms.rpt.txt')...
tessellating roi: 'roi004' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-01-15ms.rpt.txt')...
tessellating roi: 'roi005' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-01-15ms.rpt.txt')...
tessellating roi: 'roi006' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-01-15ms.rpt.txt')...
tessellating roi: 'roi007'

setup complete
stage 3 ready
inferring on roi: 'roi026' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-02-15ms.rpt.rwa')...
stage 3 done
setup complete
stage 3 ready
inferring on roi: 'roi027' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-02-15ms.rpt.rwa')...
stage 3 done
setup complete
stage 3 ready
inferring on roi: 'roi028' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-02-15ms.rpt.rwa')...
stage 3 done
setup complete
stage 3 ready
inferring on roi: 'roi029' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-02-15ms.rpt.rwa')...
stage 3 done
setup complete
stage 3 ready
inferring on roi: 'roi030' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-02-15ms.rpt.rwa')...
stage 3 done
setup complete
stage 3 ready
inferring on roi: 'roi031' (in source '/home/flaurent/github/TRamWAy/notebooks/data-ex

setup complete
stage 3 ready
inferring on roi: 'roi061' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-02-15ms.rpt.rwa')...
stage 3 done
setup complete
stage 3 ready
inferring on roi: 'roi062' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-02-15ms.rpt.rwa')...
stage 3 done
setup complete
stage 3 ready
inferring on roi: 'roi063' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-02-15ms.rpt.rwa')...
stage 3 done
setup complete
stage 3 ready
inferring on roi: 'roi064' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-02-15ms.rpt.rwa')...
sparse_grad failed at all the columns

sparse_grad failed at all the columns

sparse_grad failed at all the columns

sparse_grad failed at all the columns

stage 3 done
error: Process None-2 died with error (most recent call last):
  File "/home/flaurent/github/TRamWAy/tramway/core/parallel/__init__.py",

setup complete
stage 3 ready
inferring on roi: 'roi000' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-01-15ms.rpt.rwa')...
stage 3 done
setup complete
stage 3 ready
inferring on roi: 'roi001' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-01-15ms.rpt.rwa')...
stage 3 done
setup complete
stage 3 ready
inferring on roi: 'roi002' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-01-15ms.rpt.rwa')...
stage 3 done
setup complete
stage 3 ready
inferring on roi: 'roi003' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-01-15ms.rpt.rwa')...
stage 3 done
setup complete
stage 3 ready
inferring on roi: 'roi004' (in source '/home/flaurent/github/TRamWAy/notebooks/data-examples/Manip01-01-Beta400AA-01-15ms.rpt.rwa')...
stage 3 done
setup complete
stage 3 ready
inferring on roi: 'roi005' (in source '/home/flaurent/github/TRamWAy/notebooks/data-ex

skipping empty file /tmp/tmparbvuuuf/tmp28ym7hko.rwa
skipping empty file /tmp/tmparbvuuuf/tmp4kg3x1jx.rwa
skipping empty file /tmp/tmparbvuuuf/tmpm201m_tg.rwa
skipping empty file /tmp/tmparbvuuuf/tmp0d9t6e03.rwa
skipping empty file /tmp/tmparbvuuuf/tmpenx3q_bs.rwa
reading file: /tmp/tmparbvuuuf/tmp450hlelx.rwa
skipping empty file /tmp/tmparbvuuuf/tmpkckl1700.rwa
reading file: /tmp/tmparbvuuuf/tmp59oxcrn3.rwa
skipping empty file /tmp/tmparbvuuuf/tmpd2327fh8.rwa
skipping empty file /tmp/tmparbvuuuf/tmpo977is8q.rwa
reading file: /tmp/tmparbvuuuf/tmpi68zr61x.rwa
skipping empty file /tmp/tmparbvuuuf/tmp5ocxrky6.rwa
reading file: /tmp/tmparbvuuuf/tmp_gu8z4iy.rwa
skipping empty file /tmp/tmparbvuuuf/tmp06uycxh7.rwa
reading file: /tmp/tmparbvuuuf/tmpg2cw9_rr.rwa
skipping empty file /tmp/tmparbvuuuf/tmp91ihzjkk.rwa
skipping empty file /tmp/tmparbvuuuf/tmptjw2g0wm.rwa
reading file: /tmp/tmparbvuuuf/tmpsrbuf4ai.rwa
reading file: /tmp/tmparbvuuuf/tmpogxj7i9e.rwa
skipping empty file /tmp/tmparbvuuu

At this point, the pipeline is complete. As many *.rwa* files as input SPT data files were generated both on the local and remote hosts (if different).

# The *reload* bootstrap stage

While the `tessellate` and `infer` stages have obvious goals, the `reload` stage might look optional.
Indeed, if no so-called *environments* are defined and all the stages are sequentialy run in the notebook kernel, the data representations (the analysis trees) are available local RWAnalyzer object.

However, on truly distributed settings, the data representations need to be synchronized at specific points in the processing chain.

Basically, the communication protocol between the so-called environment and pipeline stages is too basic for the environment to automatically determine whether an arbitrary stage has updated the analysis trees. As a consequence, the task of synchronizing the analysis trees is left to the user.

Both the `tessellate` and `infer` stages update the analysis trees. The `reload` stage should also be run after the `infer` stage if any post-processing is performed. Note however that the pre-processing, "processing" and post-processing steps may usually be split into distinct scripts.

Synchronizing is also required in situations such that the number of items at the specified granularity for a stage is defined by a previous stage. For example, a first stage can define regions of interest in an unpredictable number, and a second stage can be requested to independently process each ROI. For the second stage to be properly dispatched (one ROI - one task), the submit process requires access to the output of the first stage.

In our example here, the SPT data files and ROI files are readily available and the number of tasks for the `tessellate` and `infer` stages can be easily determined.
However, the `reload` stage is required another reason:

Basically, to run each stage, the entire script is run, and the other stages are just skipped. Here, the input data are SPT ascii files, *i.e.* SPT data only. When it comes to the `infer` stage, if the `tessellate` is skipped, there is no mention of the *.rwa* files. The `infer` stage only loops over the regions of interest and expect to find the tessellations in the analysis trees. Hence the `reload` stage, that guesses these *.rwa* files exist and loads the analysis trees corresponding to the SPT data files. The `reload` stage also guesses the names of the *.rwa* files which contain these analysis trees.

Unlike the `tessellate` and `infer` stages, the `reload` stage must be performed both on the submit and worker sides and is qualified here as a *bootstrap* stage.
Every worker runs this bootstrap stage before running the subsequent `infer` task.

The pipeline eventually consists of two effective stages, first `tessellate` and second `reload`+`infer`, both dispatched onto the remote/worker host, plus `reload` that is also run on the local/submit host.

# Sequence diagram

The procedure for distributing computations using the `LocalHost` environment is similar to (slightly simpler than) the `Maestro` environment which is a specialized `SlurmOverSSH` environment:

![Sequence diagram](SlurmOverSSH.svg?1)

The above sequence diagram is available [here](https://sequencediagram.org/index.html#initialData=CoSwLgNgpgBACiADlCIB2soA8oGMCuYIA9mjAO7gAWMAyhPgE4C2A8gG5SO20ASMUNOxCNSzQWADOAKGmIAhoyK4k8tGADmo-IhgBiAMIBWAKIBBACIAhGACpbtfACNm4GJJAATKPbmLlquowAEQAUjoAnmBcADpoAGRxaMTRTsTEANbBMPKSMKF+SiAqCkHBji5uiKK4UJKS2bkwADLSgp6FAaWa2rp6qBpUYE4MsPYA6sSMGVzuXj62ncWBYCEAsrnRjDDJ3o15a0slaqvlDCz7MABKRyshk9Ozu1CX421oHdKhALQAfLbNABc1zqYH8MHi13wZDMEAg0maf1CwLQTlwpE4SgRAB5vt9kTAAKqSWaSXCMJBgaSoDCKBF-NbAgxUPAZGDo9TydCxNDydhciDyJwgVBgCLSNYM4EWYjkNAQYjyTxxEAAMxgrnq6A01O5ihgxFVquk8lwRH50Rgh28pvN8mi8lVWyt9N+jJgBkYUHtsHIUwy2pgnhEeDAU3FiLd0pAkgUYFwNHwJO2AApEBEAJTucmU6R6LTEHT6ABixYADBWyzBVVMBKaaOjmIhiB5LZIwRpYHpSz3i9IC0Wa9tvQmBEbQyBOO4O1BXUCoWQ0il24x5Lp2-JOzA1Tk0OLbZOfS1pDazYeHU7Zq1I+6LDG46PJE57aOU5IqFmyRTEFSad7GK6VzAlc0LuM+8Y0F+uZXH84zAaBSakjmP4qmQHJgtyjBxJQYA0GCkhsooGj4OI6gyH++qGsa4ywcCHBcBS3gGoQiCEHEjDkPI1YilAcZUDING-HBC4wEuUhgKuiBxBuW47mo+5nhasBvKedoXs6byCcJIFkFxHhoBo0AwPhGQmopR5vAOugKsQiCAUyLK4GyG5gEmOJ4kBXy4t885XLxUxUpG87jFyVLtBK3nCXA-ggPIEAwBxXGqjxu6eDAiGMN83jJRgaXonCE4jHUJ4jmpym6rS2yure94vjQahpYw0JxPl0BnqQMBppm2bflSkpRh6xDMMKGBxAoRRxQlnHcdAkhxA17Jej6Y1cB47YSFNSU8eRerbFRZllS6zTee6zQxqshowMgjBrdEQStYVs0lQeSlHVKIISSAUBTnF8UPWeRUyC9R7XidgLPeZlqtOFoTeUCQA).


# Granularity and parallelism for time segments

As already explained in the [previous notebook](RWAnalyzer%20simple%20pipeline.ipynb#Stage-granularity-and-pipeline-ready-iterators), each stage is dispatched according to the specified granularity.

Here, unlike `tessellate_and_infer` that runs at ROI granularity, `tessellate` runs once for each SPT data file per default to avoid the overhead of reading/writing files for each ROI, while `infer` does run at ROI granularity.

In the case the data for the inference are also segmented in time, *e.g.* using a sliding window, and no time regularization is performed, a DV inference (for example) can operate in each time segment independently.

However, the predefined `infer` stage does not split the computation down to the `'time segment'` granularity.
This is possible, writing a modified *infer* procedure that iterates the time segments, generates as many `Maps` objects as time segments, commits these multiple `Maps` objects as analysis artefacts into the analysis tree, using different labels (to be generated with the `time.segment_label` method)... but this is out of the scope of this tutorial.

Instead, we can let the `mapper` attribute locally parallelize the computation across the different time segments (by *locally* here we refer to the worker host, *e.g.* a compute node on the cluster with `SlurmOverSSH` environments).
Basically, the predefined `infer` stage will schedule as many tasks as support regions (or SPT data files if no ROI are defined), assign each task to different workers, and then the `mapper` attribute will make each task spawn
multiple processes on the worker node to analyze the different time segments in parallel.

Unlike the old `tramway.helper.inference.infer` function, when time segments are defined and no time regularization is expected, the `mapper` attribute operates any inference function/plugin with default `mapper.cell_sampling='connected'`. This makes the time segments be identified in the global microdomain adjacency matrix (spatio-temporal microdomains are also referred to as 'cells') and individualized into separate connected components of microdomains, so that the defined inference procedure applies separately to each connected component.

Two notes:

* a time segment may result in more than one connected component, if some microdomains are marked as not valid and the area or volume to be mapped is consequently not contiguous;
* an inference function/plugin that requires access to all the time segments at once, and does not define specific time regularization arguments, may require to disable this behavior with `mapper.cell_sampling=None`.

# Partial interim *.rwa* files for large data files

With larger datasets and finer granularity, a pipeline stage such as `infer` may generate a lot of intermediate (or interim) *.rwa* files. Per default, these files are augmented copies of the input *.rwa* files, and the same data may be replicated many times.

The `tessellate_and_infer` stage defined in the module `stages` exported by `tramway.analyzer`, compared to the [naive implementation](RWAnalyzer%20simple%20pipeline.ipynb#Editing-the-tessellate_and_infer-stage), embarks additional logics for saving the modified branches only, for each analysis tree, and then recombines these partial *.rwa* files into complete output *.rwa* files. This can avoid making many copies of the input files, and consequently save memory and I/O time.

Basically, `tessellate_and_infer` keeps track of the modified branches, and deletes the other branches before exiting the `with *.autosaving():` block. The recombination can be programmed passing argument `update_existing_rwa_files=True` to `append_stage`; for `tessellate_and_infer` this is implicit.

The `infer` stage features a similar pruning mechanism, triggered by argument `single_path=True`. The implementation is less general and is expected to work only at the default granularity. Unlike `tessellate_and_infer`, this is not the default behavior and `update_existing_rwa_files=True` should also be explicitly passed:

In [10]:
a.pipeline.append_stage(stages.infer(single_path=True), update_existing_rwa_files=True)

Note the signature of `infer` may be changed (arguments renamed) in the future, to align its usage with `tessellate_and_infer` that is pushed as the reference procedure, *i.e.* as to be preferred over `tessellate`+`infer`. Please check the [documentation](https://tramway.readthedocs.io/en/latest/tramway.analyzer.html#tramway.analyzer.pipeline.stages.infer).

Another optimization consists of removing the SPT data at the root of the analysis tree, using a placeholder.
Indeed, although in principle the original SPT data are not modified by a `tessellate` or `infer` stage, the saved branches always include the root node of the tree.

This can be done with the `infer` or `tessellate_and_infer` stages passing the `spt_data='placeholder'` argument to the stage creation function:

In [7]:
a.pipeline.append_stage(stages.infer(spt_data='placeholder'))

However, the root node must be explicitly restored with an additional pipeline stage:

In [8]:
a.pipeline.append_stage(stages.restore_spt_data())

# Shorter code sample

To make clear what notebook cells are critical in making the presented pipeline run with minimal setup, the definition code is summed up below.

Note again that `.run()` should not be called twice in a same Python script or notebook.

In [9]:
import os
from tramway.analyzer import *

a                                 = RWAnalyzer()

a.spt_data.from_ascii_files('data-examples/*.rpt.txt') # relative paths work alright with LocalHost
a.spt_data.localization_precision = 0.03

a.roi.from_ascii_files(suffix='roi') # => *.rpt-roi.txt

a.tesseller                       = tessellers.Hexagons

a.mapper.from_plugin('stochastic.dv')
a.mapper.diffusivity_prior        = 20
a.mapper.potential_prior          = 1
a.mapper.max_runtime              = 100 # in seconds; 100 seconds is much too short for a proper DV estimation, but convenient for a quick example
a.mapper.verbose                  = False
a.mapper.worker_count             = None if os.name == 'nt' else 4 # Windows OS is not fully supported yet

a.pipeline.append_stage(stages.tessellate())
a.pipeline.append_stage(stages.reload())
a.pipeline.append_stage(stages.infer())

a.env                             = environments.LocalHost
a.env.worker_count                = 10

# this code sample cannot run in the current notebook anyway;
# create a new notebook in the same directory, copy-paste this code cell,
# adjust the filename below so that it points at the new notebook,
# and uncomment the a.run() expression
a.env.script                      = 'Untitled.ipynb'

# a.run()