# Using DASK

This notebook will fetch data from ServiceX and process it via coffea on a loca `dask` cluster.

In [1]:
from make_it_sync import make_sync
import matplotlib.pyplot as plt
from coffea import hist, processor

from sx_multi import FuncAdlQastle, sx_event_stream, process_coffea_dask

# Fetching the data

We want to pull back only electrons with $p_T > 30$ GeV, and $abs(\eta)<2.5$. Aid since this is a super-simple algorithm - we will limit it to those events that have just two electrons.

In [2]:
ds = FuncAdlQastle()
leptons_per_event_query = ds \
        .Select(lambda e: e.Electrons("Electrons")) \
        .Select(lambda eles: eles.Where(lambda e: e.pt()/1000.0 > 30.0)) \
        .Select(lambda eles: eles.Where(lambda e: abs(e.eta()) < 2.5)) \
        .Where(lambda eles: len(eles) == 2) \
        .Select(lambda ls: (ls.Select(lambda e: e.pt()/1000.0), ls.Select(lambda e: e.eta()), ls.Select(lambda e: e.phi()), ls.Select(lambda e: e.m()/1000.0))) \
        .AsROOTTTree('data.root', 'mytree', ('ElePt', 'EleEta', 'ElePhi', 'EleM'))

And the dataset identifier we want to be scanning:

In [3]:
did = 'mc15_13TeV:mc15_13TeV.361106.PowhegPythia8EvtGen_AZNLOCTEQ6L1_Zee.merge.DAOD_STDM3.e3601_s2576_s2132_r6630_r6264_p2363_tid05630052_00'

# Define Coffea Process Function

This will get the data from above (in servicex). It gets access to a single file and must open it, and then build the invar mass.

In [4]:
accumulator = processor.dict_accumulator({
})

In [5]:
class ZMassProcessor(processor.ProcessorABC):
    def __init__(self):
        self._accumulator = processor.dict_accumulator({
            "sumw": processor.defaultdict_accumulator(float),
            "mass": hist.Hist(
                "Events",
                hist.Bin("mass", "$Z_{ee}$ [GeV]", 60, 60, 120),
            ),
        })

    @property
    def accumulator(self):
        return self._accumulator

    def process(self, events):
        output = self.accumulator.identity()

        # Because we aren't using a scheme, build one by hand.
        electrons = ak.zip({
            "pt": events.ElePt,
            "eta": events.EleEta,
            "phi": events.ElePhi,
            "mass": events.EleM,
            "charge": events.EleM,
        }, with_name="PtEtaPhiMCandidate")

        # The template of the output we will send back.
        output = accumulator.identity()

        # Do the buts, form the invar mass, plot.
        diele = electrons[cut][:, 0] + electrons[cut][:, 1]

        output["sumw"][dataset] += len(events)
        output["mass"].fill(
            mass=diele.mass,
        )

        return output

    def postprocess(self, final_results):
        return final_results

# Run through dask

First, the data stream from servicex.

In [6]:
servicex_data = sx_event_stream(did, leptons_per_event_query)

Create a local dask cluster. We'll setup 10 workers, but there is probably a better way to tune this

In [7]:
from dask.distributed import Client
client = Client(n_workers=10, asynchronous=True)

This will now run each sx resulting file through the processor above

In [8]:
accumulated_results = process_coffea_dask(servicex_data,
                                          ZMassProcessor(),
                                          client)

Plot - attempt to update the plot as it comes in (but don't know how to do that!)

In [9]:
async def plot_stream(accumulator_stream):
  async for coffea_info in accumulator_stream:
    # Need to ask coffea folks how to anomate this!
    hist.plot1d(coffea_info['mass'])
    plt.show()
  return coffea_info

await plot_stream(accumulated_results)

ServiceXException: (ServiceXException(...), 'ServiceX access token request rejected: 504')

In this demo, the data is produced on UChicago's `river` cluster and downloaded locally to where this notebook is running (for the above, this is a house in Canada). Watching my network connection, even with 10 workers, it isn't saturated. And is definately slower than the `funcx`. However, if there was a `dask` cluster located on `river`, one could connect there and then the same efficiencies seen with `funcx` would be seen here.