# ScaleOut Exercise

**NOTE: If you got an error saying `hats-sci-pi` doesn't exist, close and halt this notebook and run the 1-pyroot-setup notebook, then come back and change the kernel of this notebook back to `hats-sci-pi`**

One important component of scaling out computation is decomposing it into smaller independent chunks, which can then be executed on multiple resources simultaneously. Let's do this on a simplified example to see what this looks like and what can go wrong

In [None]:
import ROOT as r
r.gDebug = 0
import json
import pprint
import random
from ConfigParser import RawConfigParser
config = RawConfigParser()   
config.optionxform = str       # Last two lines are done because ConfigParser will not preserve case
config.read("hatsConfig.ini")
fullCrossSections = dict([sample, float(xsec)] for sample, xsec in config.items('hatsXsects'))
nProcessed    = dict([sample, int(nPro)] for sample, nPro in config.items('hatsNprocessed'))
varNames=['dijetEtBalance', 'dijetMass']
fileList = json.loads(open("filelist.json").read())
fullHatsChains = {}
shortHatsChains = {}
for sample in fileList.keys():
    chain = r.TChain('hatsDijets')
    shortChain = r.TChain('hatsDijets')
    random.shuffle(fileList[sample])
    sampleList = fileList[sample]
    shortChain.Add("/mnt/hdfs/" + sampleList[0])
    for hatsFile in sampleList:
        chain.Add("/mnt/hdfs/" + hatsFile)
    fullHatsChains[sample] = chain
    shortHatsChains[sample] = shortChain

### Change this to either run the full or short data
As you're debugging, it can help to run on a shortened dataset

In [None]:
#hatsChains = fullHatsChains ; crossSections = fullCrossSections
hatsChains = fullHatsChains ; crossSections = dict(fullCrossSections.items()[:2])

## An "Analysis"
To give us something to work with, consider the analysis below. This code will plot several kinematics from multiple samples, then store the histograms in a variable named "hist".

This is obviously trivial (and incredibly slow), but the "meat" of the analysis isn't what's important, it's the decomposition we care about. Looking at this code, what can be pulled apart?

In [None]:
hists = {}
import time
tic = time.time()
for sample in crossSections.keys():
    for varName in varNames:
        hatsChains[sample].SetBranchStatus('*', 0)
        hatsChains[sample].SetBranchStatus(varName, 1)
        histLabel = "%s_%s" % (varName, sample)
        hists[histLabel]=r.TH1F(histLabel, histLabel, 100, 0, 0)
        hatsChains[sample].Draw("%s>>%s" % (varName, histLabel))
toc = time.time()
print("Time elapsed: %0.2fsecs" % (toc - tic))
pprint.pprint(hists)
canvas = r.TCanvas()
hists['dijetEtBalance_QCD_HT500to700'].Draw()
canvas.Draw()



## The technique
One method of decomposition is to separate a loop from its body, so the body can be explicitly run elsewhere and still produce the same results.

It's important that the decomposed body doesn't change or access any global state (think about why that's necessary to scale out on multiple CPUs or machines). What in the body of this loop should be changed?

In [None]:
vals = {}
tic = time.time()
for x in range(8):
    time.sleep(1)
    vals[x] = x ** 2
toc = time.time()
print("Time elapsed: %0.2fsecs" % (toc - tic))
print(vals)

# Parallelizing
With some thought, we can decompose the body of our loop into a new function `sleep_some`.

Note that there was two changes made:
1. The loop variable `x`, which is normally implicitly passed from the loop into the body is turned into an explicit argument of our new function.
2. Instead of modifying `vals` to return data from the body of the loop, we pass it back to the caller as a return value.

Once we have our new function, we can then use `concurrent.futures`'s `ThreadPoolExecutor` to execute all 8 of our loop bodies in separate threads at the same time, which should mean it runs 8 times as fast as before.

In [None]:
# Split our "important" code into a separate function
def sleep_some(x):
    time.sleep(1)
    return (x, x ** 2)

# Make a thread pool to execute it
import concurrent.futures
tic = time.time()   
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as p:
    # Run one copy of every function on a separate thread simultaneously
    ret = p.map(sleep_some, range(8))
# Convert our return value into a dict to match the output from the previous cell
vals = dict(ret)
toc = time.time()
print("Time elapsed: %0.2fsecs" % (toc - tic))
print(vals)

## Your turn

Following the pattern above, decompose our "analysis" to use multiple threads. Once you have it debugged, run both the original and modified versions with the full dataset and compare the runtimes. Make sure the outputs are the same!

In [None]:
#
# YOUR CODE HERE
#

print("Time elapsed: %0.2fsecs" % (toc - tic))
pprint.pprint(hists)
canvas = r.TCanvas()
hists['dijetEtBalance_QCD_HT500to700'].Draw()
canvas.Draw()

### What happened?

In [None]:
# Split our "important" code into a separate function
def sleep_some(x):
    time.sleep(1)
    return (x, x ** 2)


# Make a thread pool
import concurrent.futures    
tic = time.time()   
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as p:
    # Run one copy of every function on a separate thread simultaneously
    ret = p.map(sleep_some, range(8))

toc = time.time()
print("Time elapsed: %0.2fsecs" % (toc - tic))
print(dict(ret))