# Integrating Coffea and Work Queue

### What is Coffea?
Coffea is a prototype package for pulling together all the typical needs of a high-energy collider physics (HEP) experiment analysis using the scientific python ecosystem.

https://coffeateam.github.io/coffea/index.html

### What is Work Queue?
Work Queue is a framework for building large master-worker applications that span thousands of machines drawn from clusters, clouds, and grids.

https://cctools.readthedocs.io/en/latest/work_queue/

### Why Work Queue?

- **Elastic:** Workers can be added and removed during runtime, and the master automatically uses the workers available.
- **Robust:** Tasks running on workers that fail are automatically detected and handled elsewhere.
- **Efficient:** Files may be cached at the workers, which reduces transfer times and network utilization.
- **Resource Management:**  Resources such as core, memory, and disk are measured automatically, so as to run as many tasks as possible given the available workers.
- **Agnostic:** Masters can be written in Python, Perl, or C. It is able to work with existing cluster and cloud systems: HTCondor, PBS, TORQUE, Slurm, EC2

### Setting up the conda environment for the first time

In a separate terminal on the same machine as this notebook, run the following commands to set up a conda environment for the first time.

Once the installation is complete, change the kernel of your Jupyter Notebook to workqueue-coffea.

```
$ conda create --name workqueue-coffea python=3.8 six dill
$ conda activate workqueue-coffea
$ conda install -c conda-forge xrootd ndcctools anaconda ipykernel
$ pip install coffea
$ python -m ipykernel install --user --name=workqueue-coffea
$ conda activate base
$ pip install conda-pack
$ python -c 'import conda_pack; conda_pack.pack(name="workqueue-coffea", output="workqueue-coffea.tar.gz")'
```

### A note on distributed systems 
Whenever we want to run a task on a remote worker, we have to send all files/programs that the task will use because we cannot expect that the machine the task runs on will have everything we need installed. We must create a file defining the conda environment we wish to run our tasks on and send it to the workers. 

### Getting Started

In [1]:
#import libraries for python
from coffea import hist
from coffea.analysis_objects import JaggedCandidateArray
import coffea.processor as processor
from coffea.processor import iterative_executor, work_queue_executor
from awkward import JaggedArray
import numpy as np
from time import time

### FancyDimuonProcessor
It can be found here: 
https://coffeateam.github.io/coffea/notebooks/processor.html#Getting-fancy

In [2]:
class FancyDimuonProcessor(processor.ProcessorABC):
    def __init__(self):
        dataset_axis = hist.Cat("dataset", "Primary dataset")
        mass_axis = hist.Bin("mass", r"$m_{\mu\mu}$ [GeV]", 600, 0.25, 300)
        pt_axis = hist.Bin("pt", r"$p_{T,\mu}$ [GeV]", 3000, 0.25, 300)

        self._accumulator = processor.dict_accumulator({
            'mass': hist.Hist("Counts", dataset_axis, mass_axis),
            'mass_near': hist.Hist("Counts", dataset_axis, mass_axis),
            'mass_far': hist.Hist("Counts", dataset_axis, mass_axis),
            'pt_lead': hist.Hist("Counts", dataset_axis, pt_axis),
            'pt_trail': hist.Hist("Counts", dataset_axis, pt_axis),
            'cutflow': processor.defaultdict_accumulator(int),
        })

    @property
    def accumulator(self):
        return self._accumulator

    def process(self, df):
        output = self.accumulator.identity()

        dataset = df['dataset']
        muons = JaggedCandidateArray.candidatesfromcounts(
            df['nMuon'],
            pt=df['Muon_pt'],
            eta=df['Muon_eta'],
            phi=df['Muon_phi'],
            mass=df['Muon_mass'],
            charge=df['Muon_charge'],
            softId=df['Muon_softId'],
            tightId=df['Muon_tightId']
            )

        output['cutflow']['all events'] += muons.size

        soft_id = (muons.softId > 0)
        muons = muons[soft_id]
        output['cutflow']['soft id'] += soft_id.any().sum()

        twomuons = (muons.counts >= 2)
        output['cutflow']['two muons'] += twomuons.sum()

        dimuons = muons[twomuons].distincts()

        twodimuons = (dimuons.counts >= 2)
        output['cutflow']['>= two dimuons'] += twodimuons.sum()
        dimuons = dimuons[twodimuons]

        opposite_charge = (dimuons.i0['charge'] * dimuons.i1['charge'] == -1)

        dimuons = dimuons[opposite_charge]
        output['cutflow']['opposite charge'] += opposite_charge.any().sum()

        mass_20GeV = (dimuons.mass > 35)
        dimuons = dimuons[mass_20GeV]

        exactlytwodimuons = (dimuons.counts == 2)
        output['cutflow']['== two dimuons'] += exactlytwodimuons.sum()
        dimuons = dimuons[exactlytwodimuons].compact()

        leading_mu = (dimuons.i0.pt.content > dimuons.i1.pt.content)
        pt_lead = JaggedArray.fromoffsets(dimuons.offsets, np.where(leading_mu,
                                            dimuons.i0.pt.content, dimuons.i1.pt.content))
        pt_trail = JaggedArray.fromoffsets(dimuons.offsets, np.where(~leading_mu,
                                            dimuons.i0.pt.content, dimuons.i1.pt.content))

        near_z = np.abs(dimuons.mass - 91.118).argmin()
        far_z = np.abs(dimuons.mass - 91.118).argmax()

        output['mass'].fill(dataset=dataset,
                            mass=dimuons.p4.sum().mass)
        output['mass_near'].fill(dataset=dataset,
                                 mass=dimuons.mass[near_z].flatten())
        output['mass_far'].fill(dataset=dataset,
                                mass=dimuons.mass[far_z].flatten())
        output['pt_lead'].fill(dataset=dataset,
                               pt=pt_lead.flatten())
        output['pt_trail'].fill(dataset=dataset,
                                pt=pt_trail.flatten())
        return output

    def postprocess(self, accumulator):
        return accumulator

### Setting the variables we need

First, we set the chunk size. The chunk size tells us how many entries from the file to process at once in a single "task". A larger chunksize allows for less and larger "tasks". The chunksize we are using results in 60 tasks.  

Next, we set the fileset which we want to process. Files can be found in the Coffea example here: https://coffeateam.github.io/coffea/notebooks/processor.html#Getting-fancy

Lastly, we set the arguments we want to pass to our executor. Most of these are unique to the work_queue_executor. 
- **cores/disk/memory:** We have to tell the Work Queue master how much of each resource is going to be used by each task (this is not necessary but greatly improves performance when using accurate values).
- **resource-monitor:** A tool that monitors the computational resources used by the process created by the command given as an argument, and all its descendants. https://cctools.readthedocs.io/en/latest/resource_monitor/.
- **port:** The port on which the master will listen for tasks to be submitted by the application. 0 specifies any available port.
- **environment-file:** The tar-ed conda environment that will be sent to each worker in order to be able to run Coffea.
- **master-name:** This is necessary to be able to run Work Queue. The workers use this master-name to connect to the master.

In [3]:
chunk_size = 30000 #default is 100000

fileset = { #files found on Coffea site in "Fancy" processor example
    #'DoubleMuon': [
    #    'root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root',
    #    'root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012C_DoubleMuParked.root',
    #],
    'ZZ to 4mu': [
        'root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/ZZTo4mu.root'
    ]
}

executor_args = {'debug': True,
            'flatten': True, #used for all executors
            'compression': 0, #used for all executors
            'cores': 2, 
            'disk': 1000, #MB
            'memory': 2000, #MB
            'resource-monitor': True,
            'port': 0,
            'environment-file': 'workqueue-coffea.tar.gz', #the conda environment we created and packed at the beginning
            'master-name': 'workqueue-coffea',
            'print-stdout': True
}

### Run Coffea - Iterative Executor

Here, we run the Coffea analysis tool using the iterative_executor which will split the data into various tasks and run them sequentially. 

In [4]:
iterative_start = time()

#run using iterative executor
output_iterative = processor.run_uproot_job(fileset,
                                  treename='Events',
                                  processor_instance=FancyDimuonProcessor(),
                                  executor=iterative_executor,
                                  executor_args={'flatten': True},
                                  chunksize=chunk_size
                                 )

iterative_time = time() - iterative_start

HBox(children=(FloatProgress(value=0.0, description='Preprocessing', max=1.0, style=ProgressStyle(description_…




HBox(children=(FloatProgress(value=0.0, description='Processing', max=50.0, style=ProgressStyle(description_wi…




### Start Workers

Before we run Coffea with Work Queue, we need to request some workers. 

To start a single worker, open a terminal on the same machine and within the same environment as this notebook and run the following command: 

`$ work_queue_worker -M workqueue-coffea`

To start 10 workers with other batch systems, use any one of the following commands:
- **Condor:** `$ condor_submit_workers -M workqueue-coffea --cores 4 --memory 4000 --disk 2000 10`
- **SGE:** `$ sge_submit_workers -M workqueue-coffea --cores 4 --memory 4000 --disk 2000 10`
- **PBS:** `$ pbs_submit_workers -M workqueue-coffea --cores 4 --memory 4000 --disk 2000 10`
- **Torque:** `$ torque_submit_workers -M workqueue-coffea --cores 4 --memory 4000 --disk 2000 10`
- **Slurm:** `$ slurm_submit_workers -M workqueue-coffea --cores 4 --memory 4000 --disk 2000 10`
- **EC2:** `$ ec2_submit_workers -M workqueue-coffea --cores 4 --memory 4000 --disk 2000 10`

Further information can be found here: https://cctools.readthedocs.io/en/latest/work_queue/#running-a-work-queue-application

In [5]:
!condor_submit_workers -M workqueue-coffea --cores 4 --memory 4000 --disk 2000 10

Creating worker submit scripts in /tmp/ccarball-workers...
Submitting job(s)..........
10 job(s) submitted to cluster 439133.


### Run Coffea - Work Queue Executor

In [6]:
wq_start = time()

#run using work_queue executor
output_work_queue = processor.run_uproot_job(fileset,
                                  treename='Events',
                                  processor_instance=FancyDimuonProcessor(),
                                  executor=work_queue_executor,
                                  executor_args=executor_args,
                                  chunksize=chunk_size
                                 )

wq_time = time() - wq_start

Listening for work queue workers on port 1025...


HBox(children=(FloatProgress(value=0.0, description='Processing', max=50.0, style=ProgressStyle(description_wi…

Submitted task (id #1): ./python_package_run --environment workqueue-coffea.tar.gz --unpack-to "$WORK_QUEUE_SANDBOX"/workqueue-coffea.tar.gz-env python fn_as_file function.p item_0.p output_0.p
Submitted task (id #2): ./python_package_run --environment workqueue-coffea.tar.gz --unpack-to "$WORK_QUEUE_SANDBOX"/workqueue-coffea.tar.gz-env python fn_as_file function.p item_1.p output_1.p
Submitted task (id #3): ./python_package_run --environment workqueue-coffea.tar.gz --unpack-to "$WORK_QUEUE_SANDBOX"/workqueue-coffea.tar.gz-env python fn_as_file function.p item_2.p output_2.p
Submitted task (id #4): ./python_package_run --environment workqueue-coffea.tar.gz --unpack-to "$WORK_QUEUE_SANDBOX"/workqueue-coffea.tar.gz-env python fn_as_file function.p item_3.p output_3.p
Submitted task (id #5): ./python_package_run --environment workqueue-coffea.tar.gz --unpack-to "$WORK_QUEUE_SANDBOX"/workqueue-coffea.tar.gz-env python fn_as_file function.p item_4.p output_4.p
Submitted task (id #6): ./pyth

Task (id #1) complete: ./python_package_run --environment workqueue-coffea.tar.gz --unpack-to "$WORK_QUEUE_SANDBOX"/workqueue-coffea.tar.gz-env python fn_as_file function.p item_0.p output_0.p (return code 0)
Output:

allocated cores: 2, memory: 2000 MB, disk: 1000 MB
measured cores: 2, memory: 115 MB, disk 990 MB, runtime 36.103177
Task (id #2) complete: ./python_package_run --environment workqueue-coffea.tar.gz --unpack-to "$WORK_QUEUE_SANDBOX"/workqueue-coffea.tar.gz-env python fn_as_file function.p item_1.p output_1.p (return code 0)
Output:

allocated cores: 2, memory: 2000 MB, disk: 1000 MB
measured cores: 2, memory: 107 MB, disk 990 MB, runtime 42.898914
Task (id #3) complete: ./python_package_run --environment workqueue-coffea.tar.gz --unpack-to "$WORK_QUEUE_SANDBOX"/workqueue-coffea.tar.gz-env python fn_as_file function.p item_2.p output_2.p (return code 0)
Output:

allocated cores: 2, memory: 2000 MB, disk: 1000 MB
measured cores: 2, memory: 105 MB, disk 990 MB, runtime 38.30

Task (id #28) complete: ./python_package_run --environment workqueue-coffea.tar.gz --unpack-to "$WORK_QUEUE_SANDBOX"/workqueue-coffea.tar.gz-env python fn_as_file function.p item_27.p output_27.p (return code 0)
Output:

allocated cores: 2, memory: 2000 MB, disk: 1000 MB
measured cores: 1, memory: 122 MB, disk 990 MB, runtime 10.573684
Task (id #25) complete: ./python_package_run --environment workqueue-coffea.tar.gz --unpack-to "$WORK_QUEUE_SANDBOX"/workqueue-coffea.tar.gz-env python fn_as_file function.p item_24.p output_24.p (return code 0)
Output:

allocated cores: 2, memory: 2000 MB, disk: 1000 MB
measured cores: 1, memory: 129 MB, disk 990 MB, runtime 11.925775
Task (id #22) complete: ./python_package_run --environment workqueue-coffea.tar.gz --unpack-to "$WORK_QUEUE_SANDBOX"/workqueue-coffea.tar.gz-env python fn_as_file function.p item_21.p output_21.p (return code 0)
Output:

allocated cores: 2, memory: 2000 MB, disk: 1000 MB
measured cores: 2, memory: 109 MB, disk 990 MB, runt

### A note on conda environments

On each worker that is running tasks, we have to untar, unpack, and set up the conda environment so that we can run tasks from within that environment on the remote machine. This set-up process can take anywhere from 10 seconds to a few minutes to run. Only once that has completed on the worker can we begin to run tasks. Unfortunately, each task has to check whether the environment has already been set up (adding an extra few seconds to the runtime if the environment has already been set up on the worker). 

### Clean Up
We started some workers on the Condor pool and we have to remove them once we are done to let other people make use of them :)

In [7]:
!condor_rm ccarball

All jobs of user "ccarball" have been marked for removal


### Comparison

In [8]:
speedup = iterative_time / wq_time

In [9]:
print("Iterative Time: ", iterative_time)
print("Work Queue Time: ", wq_time)
print("Speedup: ", speedup)

Iterative Time:  172.90614676475525
Work Queue Time:  329.9537396430969
Speedup:  0.5240314807517675


### Finishing Up
Coffea plus WorkQueue is still relatively new and has a long way to go in terms of performance so stay tuned for more updates to come!