## Data access demo

In this demo, we will take a look at the different options we have for accessing our ML training/testing data. We have two different methods:

- Read all the data into memory -- a `pandas.DataFrame` for the scalar-type branches (i.e. 1 value per topo-cluster), and a dictionary of `numpy` arrays for the calorimeter images. This will *also* create HDF5 files holding these data objects, for faster access in the future (i.e. so we don't have to create these from the `ROOT` files again). These objects will be loaded into memory, which means that data access will be fast but we could run into memory issues if using a lot of data.
- Stream the data from the `ROOT` files, using a custom `tensorflow.Dataset` class. This will be notably slower than reading everything into memory at once, but will scale well as we can deal with very large datasets.

In this demo, we will use both methods to train a simple classifier. Doing this, we'll also see some examples of using our topo-cluster classification networks & network training utilities.

In [1]:
import sys,os,glob,time
import numpy as np
import tensorflow as tf
import uproot as ur
import ROOT as rt

2021-10-11 18:41:56.958880: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0


Welcome to JupyROOT 6.24/02


In [2]:
# Our custom stuff.
path_prefix = os.getcwd() + '/../'
if(path_prefix not in sys.path): sys.path.append(path_prefix)
from util import resolution_util as ru
from util import plot_util as pu
from util import ml_util as mu
from util import qol_util as qu
from util import data_util as du

# Some more custom stuff (tf.keras things).
from util.keras.callbacks import GetCallbacks

# Some more custom stuff (classification-specific).
from util.classification import data_util as cdu
from util.classification import training_util as ctu
from util.classification.models import baseline_nn_model

Let's fetch our data. Note that the paths here are specific to however you have stored the data!

In [3]:
inputpath=path_prefix+'data/pion/'
rootfiles = {        
    'p0':inputpath + 'user.mswiatlo.900246.PG_singlepi0_logE0p2to2000.recon.ESD.e8312_e7400_s3170_r12383.images_v01.1_OutputStream/*.root',
    'pp':inputpath + 'user.mswiatlo.900247.PG_singlepion_logE0p2to2000.recon.ESD.e8312_e7400_s3170_r12383.images_v01.1_OutputStream/*.root'
}

# Let's explicitly list the root files for each category, so that we can trim down the lists (makes this run faster!).
# Our methods below can use either the glob-compatible strings above, or the lists we're making.
nfiles = 3
rootfiles = {key:glob.glob(val,recursive=True)[:nfiles] for key,val in rootfiles.items()}

# Names for our scalar-type branches (cluster energy, eta, etc.). 
# We are not actually using them in this demo, besides a trivial clusterEta cut, but just demonstrating how one would access these.
branches = [
            'cluster_ENG_CALIB_TOT','clusterEta'
]

layers = ['EMB1']

Let's define our network architecture -- it will be used in both parts below.

In [4]:
lr = 1.0e-4
nepochs = 3
patience = 2
batch_size = 600 # relatively large batch size -- but this makes sure things run rather quickly for the demo (esp. relevant when streaming).
gamma = 0.1
min_delta = 0.0001
dropout = 0.1
normalization = True
verbose = 1

In [5]:
layer = 'EMB1'
assert(layer in layers)
npix = mu.cell_meta[layer]['len_eta'] * mu.cell_meta[layer]['len_phi']
architecture = baseline_nn_model(None, npix, lr=lr, dropout=dropout, normalization=normalization,input_name=layer) # TODO: strategy is unused

## Part 1: In-memory data

Now let's load the data into memory, using the `pandas` + `numpy` approach. 

In [6]:
h5_name_suffix = 'data_demo'
h5_name = inputpath + h5_name_suffix
modelfile = 'demo_network_memory.h5'

In [7]:
cuts = [
#    ['cluster_ENG_CALIB_TOT',.2,'lower'], # Note: I've commented this out since our data-streaming method (part 2) can't do cuts yet, and we want an apples-to-apples comparison to the extent possible.
    ['clusterEta',(-0.7,0.7),'window'] # This is actually a redundant cut -- already present in our data, but it demonstrates how one applies one.
]

cut_distributions = [x[0] for x in cuts]
cut_values = [x[1] for x in cuts]
cut_types = [x[2] for x in cuts]

The `ml_util.setupPionData()` function handles our data preparation -- for those familiar with previous methods (e.g. those used in Max's Jupyter notebooks), this is quite similar but a bunch of things are handled under-the-hood. In the example below, `pdata` will be a `pandas.DataFrame` with our scalar-type branches, and `pcells` will be a dictionary of `numpy` arrays representing the calorimeter images.

In [8]:
pdata,pcells = mu.setupPionData(
    rootfiles, 
    branches=branches, 
    layers=layers, 
    balance_data=True, # Whether or not to make sure to have equal numbers of pi0 and pi+ events
    n_max = -1, # take this many events (or max possible) from pi0 and pi+. Set to -1 to take the max possible w/out upper cap. Not relevant if not balancing the data.
    verbose=True,
    load=False, # if True, will load files w/ requested names if they exist
    save=False, # if True, saves some HDF5 files
    filename=h5_name,
    match_distribution='cluster_ENG_CALIB_TOT', # balance the data by matching this distribution ffor pi0 and pi+
    match_binning = (20000,0.,2000.), # binning for doing the matching
    cut_distributions=cut_distributions, # cut on these distributions
    cut_values=cut_values, # defines the cuts for the above distributions (single value for upper/lower, tuple for window)
    cut_types=cut_types # defines the cut type for the above distributions (lower, upper or window)
)

Applying cut on distribution: clusterEta.
Matching data series on distribution: cluster_ENG_CALIB_TOT.
Balancing data: 81821 events per category.
Preparing pandas DataFrame.
Preparing calorimeter images. |[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m[32m█[0m| 100.0% % Complete


Now let's define our indices for training, validation and testing. Note that we accomplish this with a function from the `util.classification` sub-library.

In [9]:
pdata_merged, pcells_merged, plabels = cdu.DataPrep(pdata, 
                                                    pcells, 
                                                    layers, 
                                                    trainfrac=0.7,
                                                    filename='' # if not blank, will save an HDF5 file containing indices
                                                   )

Now let's train a simple DNN using our data (it will only use the `EMB1` layer from our calorimeter images).

The `util.classification.TrainNetwork()` function below will train/load our neural network. It returns the trained model, as well as a "history" object containing metrics like network accuracy and ROC curve AUC as a function of epoch. The trained model will be saved to an HDF5 file, and the history will be saved to a CSV file, both using the `modelfile` parameter to determine the filename.

The callbacks that are provided by our `GetCallbacks()` function includes a learning rate scheduler (which causes the learning rate to undergo exponential decay from one epoch to the next), as well as an early stopping condition that will stop training if the validation loss does not improve after a certain number of epochs.

In [10]:
start_time_1 = time.time()

model,history = ctu.TrainNetwork(
    model=architecture,
    modelfile=modelfile,
    x_train = pcells_merged[layer][pdata_merged.train],
    y_train = plabels[pdata_merged.train],
    x_valid = pcells_merged[layer][pdata_merged.val],
    y_valid = plabels[pdata_merged.val],
    callbacks = GetCallbacks(modelfile, append=True, use_decay=True, gamma=gamma, min_delta=min_delta, patience=patience),
    epochs=nepochs,
    batch_size=batch_size,
    verbose=verbose,
    overwriteModel=True, # whether or not to overwrite an existing file (modelfile) -- if not, load that file's network
    finishTraining=False # if loading a model, try to train to last requested epoch if training had ended early (e.g. due to patience parameter)
)

end_time_1 = time.time()

Epoch 1/3
Epoch 2/3
Epoch 3/3


2021-10-11 18:42:13.740329: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-10-11 18:42:13.794628: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1733] Found device 0 with properties: 
pciBusID: 0000:18:00.0 name: Quadro P5000 computeCapability: 6.1
coreClock: 1.7335GHz coreCount: 20 deviceMemorySize: 15.90GiB deviceMemoryBandwidth: 269.00GiB/s
2021-10-11 18:42:13.794662: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-10-11 18:42:13.797774: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublas.so.11
2021-10-11 18:42:13.797867: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublasLt.so.11
2021-10-11 18:42:13.798680: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcufft.so.10


## Part 2: Using file streaming

Now let's do the same thing as above, but using our data streaming instead of loading the data into memory. A few notes and caveats:

- You might have noticed that our `util.ml_util.SetupPionData()` function took a *dictionary* of `ROOT` files, where the keys represented the different categories (e.g. signal vs. background) and the values were lists of `ROOT` files. Our `MLV1Dataset` can take either such a dictionary of `ROOT` files or a list of `ROOT` files. If using a dictionary, the keys will once again represent different categories. If using a list, one must also supply a `target` argument, which is the name of a branch in the `ROOT` files' trees -- this can be used for regression, for example measuring the calibration hits of topo-clusters.
- Using a larger batch size will increase memory consumption, but notably speed things up as well -- thus there is some balance to strike.
- You can specify a `step_size`, which determines the size of the buffer used for reading the files. The larger the buffer, the faster the reading should be -- but this will increase memory usage. If not specified, the `step_size` will default to as many megabytes as there are elements in a single batch (e.g. if the batch size is 200, then `step_size` will default to `"200 MB"`).
- As with our other data method, we can specify which branches we wish to use, here via the `scalar_branches` and `matrix_branches` arguments. I think that it is good practice to only select the branches you really need, as this may speed things up.
- We don't yet have any way of performing pre-selection cuts on the data -- so the `ROOT` files being used should have any cuts already applied (by contrast, we performed some cuts in Part 1). We may later add this functionality. Similarly, we do not have a way of "balancing" the data (matching signal and background on some distribution). For the time being, one may accomplish these things via some separate pre-processing of the `ROOT` files, but in the long-term I would like to build it into this data streaming class for ease-of-use.

In [11]:
modelfile = 'demo_network_streaming.h5'

In [12]:
data_stream = du.MLTreeV1Dataset(
    root_files = rootfiles,
    tree_name = 'ClusterTree',
    scalar_branches = branches,
    matrix_branches = ['EMB1'], # Note: the default argument is the full list fetched via ml_util.cell_meta.keys()
    target = 'cluster_ENG_CALIB_TOT', # Note: For classification we aren't actually using this target -- it will print a warning letting us know.
    batch_size = batch_size,
    shuffle = True,
    step_size = None,
    prefetch = True,
    flatten_images = True # whether or not to flatten the image branches
    #key_map = {'EMB1':'input'} # remap the EMB1 Tensor to a Tensor named "input" (which is what the network expects the input tensor to be named)
)



In [13]:
start_time_2 = time.time()

model,history = ctu.TrainNetwork(
    model=architecture,
    modelfile=modelfile,
    data_train = data_stream,
    data_valid = data_stream,
    callbacks = GetCallbacks(modelfile, append=True, use_decay=True, gamma=gamma, min_delta=min_delta, patience=patience),
    epochs=nepochs,
    batch_size=batch_size,
    verbose=verbose,
    overwriteModel=True, # whether or not to overwrite an existing file (modelfile) -- if not, load that file's network
    finishTraining=False # if loading a model, try to train to last requested epoch if training had ended early (e.g. due to patience parameter)
)

end_time_2 = time.time()

Epoch 1/3




Epoch 2/3
Epoch 3/3


Given the lack of "data balancing" for our streaming method, we can only make a limited comparison between the network results (ideally they would be identical, but the training conditions differ for now). If our full dataset has more signal than background or vice-versa, interpreting the accuracy printed above is not so simple.

In [14]:
dt_1 = end_time_1 - start_time_1
dt_2 = end_time_2 - start_time_2

print('Total training time for in-memory data: {}s'.format(dt_1))
print('Total training time for steaming data:  {}s'.format(dt_2))

Total training time for in-memory data: 5.481272459030151s
Total training time for steaming data:  452.71575379371643s


In [15]:
# -- Quick test: Plot integrals of images, make sure things look OK --

# n = 50
# data_stream_mini = data_stream.take(n)

# n = len(data_stream_mini)
# sums = np.zeros(n)
# for i,x in enumerate(data_stream_mini):
#     sums[i] = np.sum(x[0]['EMB1'].numpy())
    
# rt.gStyle.SetOptStat(1)
# c = rt.TCanvas(qu.RN(),'',800,600)
# h = rt.TH1F(qu.RN(),'',1000,0.,100.)

# for s in sums: h.Fill(s)
# h.Draw()
# c.Draw()