In [1]:
from invisible_cities.dataflow import dataflow as fl

# Dataflow

IC is based on a framework named `dataflow`. This is based on a pipeline workflow, in which a data stream can, for example be split into many, each performing a different task.

In dataflow, a workflow is composed of three elements:
- a source: provides the data downstream (e.g. reading from a file)
- a pipe: transforms, filters and consumes the data
- a result: the output of the workflow

Pipes are composable object , i.e. they can contain many sub-pipes. They come in 3 flavours:
- map (`fl.map`): applies a transformation to the data
- filter (`fl.filter`): selects data based on some criteria
- sink (`fl.sink`): consumes the data (doesn't propagate the data downstream)

A workflow can be split into many using
- branch (`fl.branch`): creates a separate workflow, but does not consume the data
- fork (`fl.fork`): creates many separate workflows, consuming the data (i.e. a fork is a sink)
  
There are other utilities based on this concepts:
- spy (`fl.spy`): meant for data inspection, without transformations (it does not guarantee it)
- count (`fl.count`): counts events reaching the component

Unfortunately, there isn't much documentation. You can find the definitions in

https://github.com/next-exp/IC/blob/master/invisible_cities/dataflow/dataflow.py

and usage examples in

https://github.com/next-exp/IC/blob/master/invisible_cities/dataflow/dataflow_test.py

# City structure

The default dataflow architecture assumes a single input object and a single output object. In IC, we need to handle many objects simultaneously (e.g. event number, time, kDST, hDST, ...) and each component may use only a subset of those. Moreover, the output of a component might need to be combined with others. This is why the data stream in IC is made of dictionaries. Each key in the dictionary contains a different object. For instance, the source for beersheba generates the following dictionary

```
dict(hits         = ...,
     kdst         = ...,
     run_number   = ...,
     event_number = ...,
     timestamp    = ...)
```

To restrict a pipe component to certain entries, we use the `args` argument. For instance

```fl.map(a_function, args="kdst")```

if we only need one argument or

```fl.map(a_function, args=("hits", "event_number"))```

if we need many.

The output of the function follows a similar pattern. If we want to introduce the output in the data stream we use the `out` keyword:

```fl.map(a_function, out="some_name")```

if we only have one output or

```fl.map(a_function, out=("some_name", "some_other_name"))```

if we have many.

Finally, if the output replaces the input, we can use

```fl.map(a_function, args="something", out="something")```

or simply

```fl.map(a_function, item="something")```

Lastly, if in a given pipeline we only need one of those objects, we can narrow it down by placing the label in the pipeline like:

```("hits", a_function_that_processes_hits)```

It must be noted that none of the components after the selection will be able to access the data in any of the other labels.

# Some examples

## Simple pipeline

In [2]:
a_source  = range(10)
collect   = []
a_pipe    = fl.pipe( fl.map(lambda x: x**2)
                   , fl.sink(collect.append)
                   )
fl.push( source = a_source
       , pipe   = a_pipe)

print(collect)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


## With a branch

In [3]:
a_source   = range(10)
collect    = []
collect_sq = []
a_pipe     = fl.pipe( fl.branch(fl.sink(collect.append))
                    , fl.map(lambda x: x**2)
                    , fl.sink(collect_sq.append)
                    )
fl.push( source = a_source
       , pipe   = a_pipe)

print(collect)
print(collect_sq)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


## With a fork

In [4]:
a_source   = range(10)
collect    = []
collect_sq = []
collect_cb = []
a_pipe     = fl.pipe( fl.fork(                          fl.sink(collect   .append)
                             , (fl.map(lambda x: x**2), fl.sink(collect_sq.append))
                             , (fl.map(lambda x: x**3), fl.sink(collect_cb.append))
                             )
                    )
fl.push( source = a_source
       , pipe   = a_pipe)

print(collect)
print(collect_sq)
print(collect_cb)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]


## Using a dictionary

In [5]:
a_source   = ({"a": i} for i in range(10))
collect    = []
collect_sq = []
a_pipe     = fl.pipe( fl.map(lambda x: x**2, args="a", out="b")
                    , fl.fork( fl.sink(collect   .append, args="a")
                             , fl.sink(collect_sq.append, args="b"))
                    )
fl.push( source = a_source
       , pipe   = a_pipe)

print(collect)
print(collect_sq)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


# Exploring detsim

```python
fl.push(source = MC_hits_from_files(files_in, rate),               # reads data from file
        pipe   = fl.pipe( fl.slice(*event_range, close_all=True)   # selects events based on configuration
                        , filter_delayed_hits
                        , select_s1_candidate_hits
                        , select_active_hits
                        , filter_events_no_active_hits
                        , fl.branch(write_nohits_filter)           # a separate workflow that simply writes the outcome of the filter to file
                        , events_passed_active_hits.filter         # the actual event filter
                        , simulate_electrons
                        , count_photons
                        , fl.branch(write_dark_evt_filter)         # another filter
                        , dark_events.filter
                        , get_buffer_times_and_length
                        , create_pmt_waveforms
                        , create_sipm_waveforms
                        , get_bin_edges
                        , buffer_calculation
                        , "event_number"                           # keeps only the event number for the remainder of the pipeline
                        , evtnum_collect.sink),                    # collects event numbers using a sink
        result  = dict( events_in     = event_count_in.future
                      , evtnum_list   = evtnum_collect.future
                      , dark_events   = dark_events   .future))
```