# Advanced Event 'Loops'

Iterating in python is relatively slow, though it can be useful for prototyping.

First, lets declare a projection function that takes an event and projects out the neutrino energy from it

In [1]:
import pyNUISANCE as pn
import pyProSelecta as pps

evs = pn.EventSource("dune_argon_sf_10mega.nuwro.pb.gz")
if not evs:
    print("Error: failed to open input file")

## DataFrames

NUISANCE provides the `FrameGen` facility for declaring functional event processors and then executing them in batch. Lets see an example of how it works. We include the `limit` call to stop the internal loop running over the entire file.

In [2]:
print(pn.FrameGen(evs).limit(10).all())

 --------------
 | evt# | cvw |
 --------------
 |    0 |   1 |
 |    1 |   1 |
 |    2 |   1 |
 |    3 |   1 |
 |    4 |   1 |
 |    5 |   1 |
 |    6 |   1 |
 |    7 |   1 |
 |    8 |   1 |
 |    9 |   1 |
 --------------


### Defining New Columns
The Frame returned from `FrameGen.all` always contains the event number and the CV weight for all processed events. These are a useful start, but we can define new columns to hold projected or calculated event properties. For example, to hold the neutrino energy for each event. Column generation functions should return the `pn.Frame.missing_datum` flag if the projection in question cannot be applied to a given event.

In [3]:
def enu_GeV(ev):
    bpart = pps.sel.Beam(ev,14)
    if bpart:
        return bpart.momentum().e() * 1E-3 # events are always with MeV momentum units
    return pn.Frame.missing_datum

print(pn.FrameGen(evs).limit(10).add_column("enu_GeV",enu_GeV).all())

 ------------------------
 | evt# | cvw | enu_GeV |
 ------------------------
 |    0 |   1 |   2.275 |
 |    1 |   1 |    14.3 |
 |    2 |   1 |    2.86 |
 |    3 |   1 |   3.728 |
 |    4 |   1 |    9.08 |
 |    5 |   1 |   3.237 |
 |    6 |   1 |   2.473 |
 |    7 |   1 |   1.916 |
 |    8 |   1 |   1.988 |
 |    9 |   1 |   3.671 |
 ------------------------


### Getting and Modifying Existing Columns

Column indices can be searched for or accessed by name. A static sentinal value is returned to flag non-existant columns

In [4]:
fr = pn.FrameGen(evs).limit(10).add_column("enu_GeV",enu_GeV).all()
print("column index for \"enu_GeV\" = %s" % fr.find_column_index("enu_GeV"))
print("column index for \"foobar\" = %s, == pn.Frame.npos: %s" % (fr.find_column_index("foobar"),fr.find_column_index("foobar") == pn.Frame.npos))

column index for "enu_GeV" = 2
column index for "foobar" = 4294967295, == pn.Frame.npos: True


Using the index operator, `fr["colname"]` returns a reference to a column, so the values can be modified in place

In [5]:
fr = pn.FrameGen(evs).limit(10).add_column("enu_GeV",enu_GeV).all()
print("Original\n",fr)
# modify the original frame
fr["enu_GeV"] *= 2
print("After *= 2\n",fr)

# can use numpy operations on the column objects
import numpy as np
fr["enu_GeV"] = np.sqrt(fr["enu_GeV"])
print("After sqrt\n",fr)

Original
  ------------------------
 | evt# | cvw | enu_GeV |
 ------------------------
 |    0 |   1 |   2.275 |
 |    1 |   1 |    14.3 |
 |    2 |   1 |    2.86 |
 |    3 |   1 |   3.728 |
 |    4 |   1 |    9.08 |
 |    5 |   1 |   3.237 |
 |    6 |   1 |   2.473 |
 |    7 |   1 |   1.916 |
 |    8 |   1 |   1.988 |
 |    9 |   1 |   3.671 |
 ------------------------
After *= 2
  ------------------------
 | evt# | cvw | enu_GeV |
 ------------------------
 |    0 |   1 |    4.55 |
 |    1 |   1 |    28.6 |
 |    2 |   1 |    5.72 |
 |    3 |   1 |   7.457 |
 |    4 |   1 |   18.16 |
 |    5 |   1 |   6.475 |
 |    6 |   1 |   4.946 |
 |    7 |   1 |   3.832 |
 |    8 |   1 |   3.975 |
 |    9 |   1 |   7.343 |
 ------------------------
After sqrt
  ------------------------
 | evt# | cvw | enu_GeV |
 ------------------------
 |    0 |   1 |   2.133 |
 |    1 |   1 |   5.348 |
 |    2 |   1 |   2.392 |
 |    3 |   1 |   2.731 |
 |    4 |   1 |   4.261 |
 |    5 |   1 |   2.545 |
 |  

Column names that are valid attribute names can also be referenced with the attribute syntax

In [6]:
fr = pn.FrameGen(evs).limit(10).add_column("enu_GeV",enu_GeV).all()
print("Original: ", fr.enu_GeV)
fr.enu_GeV += 1
print("After += 1: ", fr.enu_GeV)

Original:  [ 2.27487048 14.30072308  2.86011408  3.72836792  9.07992245  3.23726147
  2.47278658  1.91614913  1.98767289  3.67133838]
After += 1:  [ 3.27487048 15.30072308  3.86011408  4.72836792 10.07992245  4.23726147
  3.47278658  2.91614913  2.98767289  4.67133838]


## Filters

We can apply filters in a similar way to the batched loop

In [7]:
print(pn.FrameGen(evs).limit(50).filter(lambda x : not (x.event_number() % 5)).add_column("enu_GeV",enu_GeV).all())

 ------------------------
 | evt# | cvw | enu_GeV |
 ------------------------
 |    0 |   1 |   2.275 |
 |    5 |   1 |   3.237 |
 |   10 |   1 |   2.506 |
 |   15 |   1 |   2.682 |
 |   20 |   1 |   1.528 |
 |   25 |   1 |   3.214 |
 |   30 |   1 |   3.033 |
 |   35 |   1 |   2.014 |
 |   40 |   1 |   14.87 |
 |   45 |   1 |   1.661 |
 ------------------------


## Chunked Processing

For long-running processes that would produce very large data frames it might be better to do secondary processing on chunks of the full data frame rather than waiting for the whole thing to be ready. Internally, `FrameGen::all` calls `FrameGen::first` and then `FrameGen::next` until there are no more events in the input event stream to process to new rows. We can steer this chunked processing loop manually in the python. 

The chunk size can be set with the second parameter to the FrameGen constructor and it defaults to 50,000.

In [8]:
chunk_size = 100
fg = pn.FrameGen(evs,chunk_size).limit(4*chunk_size)

chunk = fg.first()
nrows = 0
while chunk.rows() > 0:
    nrows += chunk.rows()
    print("fetched %s new rows, total fetched: %s" % (chunk.rows(), nrows))
    chunk = fg.next()

print("processed %s rows in total" % nrows)

fetched 100 new rows, total fetched: 100
fetched 100 new rows, total fetched: 200
fetched 100 new rows, total fetched: 300
fetched 100 new rows, total fetched: 400
processed 400 rows in total


**N.B.** `FrameGen::limit` limits the number of events read from the event stream, but the `chunk_size` limits the number of rows returned per call.

In [9]:
chunk_size = 74
fg = pn.FrameGen(evs,chunk_size).filter(lambda x : not (x.event_number() % 3)).limit(10*chunk_size)

chunk = fg.first()
nrows = 0
while chunk.rows() > 0:
    nrows += chunk.rows()
    print("fetched %s new rows, total fetched: %s" % (chunk.rows(), nrows))
    chunk = fg.next()

print("processed %s rows in total, total events read %s" % (nrows,chunk.nevents()))

fetched 74 new rows, total fetched: 74
fetched 74 new rows, total fetched: 148
fetched 74 new rows, total fetched: 222
fetched 25 new rows, total fetched: 247
processed 247 rows in total, total events read 740


**N.B.B** If you request a chunk, you must always process every row in the chunk before breaking out of your processing loop early, or the normalization information from the last chunk will not correspond to the number of rows that you processed.

## Arrow and Pandas

If NUISANCE has been built with Apache Arrow support then `FrameGen` can also produce `arrow::RecordBatch` instances. See pyarrow documentation for [`RecordBatches`](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html).

You can query if this copy of NUISANCE has Arrow support like:

In [12]:
print(pn.FrameGen.has_arrow_support())

True


A key difference between `nuis::Frame` and `arrow::RecordBatch` is that `RecordBatch` instances can have different columns of different types, whereas a `Frame`, which is backed by a single `Eigen::ArrayXXd` always has storage type `double`. But, as you can see in the print out below, the "event number" column in the `RecordBatch` is of type `int64`. More integer-typed columns can be added with the `FrameGen.add_int_column` and `FrameGen.add_int_columns` functions. `FrameGen.add_column` is actually just shorthand for `FrameGen.add_double_column`.

In [13]:
import pyarrow as pa
chunk_size = 100

def enu_GeV(ev):
    bpart = pps.sel.Beam(ev,14)
    if bpart:
        return bpart.momentum().e() * 1E-3 # events are always with MeV momentum units
    return pn.Frame.missing_datum

def nprotons(ev):
    return len(pps.sel.OutParts(ev,2212))

fg = pn.FrameGen(evs,chunk_size) \
    .add_column("enu_GeV",enu_GeV) \
    .add_int_column("number of protons",nprotons) \
    .limit(4*chunk_size)

chunk = fg.firstArrow()
print(chunk)

pyarrow.RecordBatch
event number: int64
cv weight: double
fatx estimate: double
enu_GeV: double
number of protons: int64


Because the `RecordBatch` instances are also meant to be saved to disk for later analysis, decoupled from the original input event vector, a column including the running estimate of the fatx per event row is also included.

Arrow also provides first class support for Pandas integration. So if you prefer doing analysis with a Pandas DataFrame, you can get one from `FrameGen` in one line of python!

In [14]:
df = chunk.to_pandas()
print(df)

    event number  cv weight  fatx estimate    enu_GeV  number of protons
0              0        1.0        1.05628   2.274870                  1
1              1        1.0        1.05628  14.300723                  3
2              2        1.0        1.05628   2.860114                  1
3              3        1.0        1.05628   3.728368                  1
4              4        1.0        1.05628   9.079922                  1
..           ...        ...            ...        ...                ...
95            95        1.0        1.05628   4.751948                  1
96            96        1.0        1.05628   2.102326                  2
97            97        1.0        1.05628   2.143340                  2
98            98        1.0        1.05628  16.579393                  1
99            99        1.0        1.05628   3.194466                  1

[100 rows x 5 columns]
