# High Energy Physics analysis using Coffea + Work Queue Executor

[Coffea](https://coffeateam.github.io/coffea) is a prototype framework for pulling together all the typical needs of HEP experiment analysis using the scientific python ecosystem. In Coffea, you write processors that analyse sets of events in parallel, and accumulators that merge the results together (e.g., into histograms). This tutorial assumes that you are familiar with Coffea.

[Work Queue](https://cctools.readthedocs.io/en/stable/work_queue/) is a system and library for creating and managing scalable manager-worker style programs that scale up to thousands of machines on clusters, clouds, and grids. With Work Queue, Coffea sends the work to be done to a set of Work Queue workers. These workers may be running locally, remotely in a cluster, or in some container environment.

When used together with Coffea, it can measure the resources, such as cores and memory, that chunks of events need and adapt the allocations automatically to maximize throughput. In this notebook we will demonstrate how the executor can dynamically modify the size of chunks of events to process if the memory available is not enough, and adapt it to desired resources usage. But as a starter, let's first introduce its basic use for small local runs.

<img src="https://github.com/cooperative-computing-lab/coffea-wq-notebook/blob/master/images/coffea-wq-general.png?raw=1" title="Coffea Work Queue basic architecture" style="display:block; margin:auto;"/>

## Installation

### From binder

You should be set already!

(But given mybinder memory constraints, you may not be able to run the more interesting examples.)

In [None]:
# read the resource limits that mybinder gave us
try:
    import os
    os.environ["DISK"] = str(10000) # MB
    os.environ["CORES"] = os.environ["CPU_LIMIT"]
    os.environ["MEMORY"] = str(int(os.environ["MEM_LIMIT"])/1e6)  # B to MB
except KeyError:
    pass

### Inside Google Colab

In [None]:
%%script echo skipping
# if in Gooble Colab, comment the line above!

# first we need to install conda
!pip install -q condacolab
import condacolab
condacolab.install()

# install the needed packages using mamba,
# as it solves the dependency tree much faster
!conda install -q -y -c conda-forge mamba
!mamba install -q -y coffea=0.7.18 ndcctools=7.4.13

# If using google colab, we need to get one of the 
# test processors
!wget -q https://raw.githubusercontent.com/cooperative-computing-lab/coffea-wq-notebook/final-talk/MemNanoTestProcessor.py

### From the shell with a conda environmnent file

environment.yml:

```yaml
name: coffea-wq-notebook
channels:
  - conda-forge
dependencies:
  - coffea
  - jupyterlab
  - ndcctools
  - conda
  - python=3.9
```

shell command:
```sh
conda env create --file environment.yml
conda activate coffea-wq-notebook
jupyter-lab index.ipynb
```

## Load coffea

In [None]:
# all coffea applications are run by coffea.processor.Runner
from coffea.processor import Runner
from coffea.processor import WorkQueueExecutor

# workers will be created using this Factory
from work_queue import Factory

In [None]:
# Use the test processor included in coffea:
from coffea.processor.test_items import NanoTestProcessor as MyProcessor

In [None]:
# Define example input file. 

filelist = {
    "ttHJet": ["http://www.crc.nd.edu/~kmohrman/files/root_files/for_ci/ttHJet_UL17_R1B14_NAOD-00000_10194.root"]
}

## Basic configure for the WorkQueueExecutor

In [None]:
# For the most basic Work Queue configuration, we need to set the port that
# the workers will use to communicate with the Coffea application.
# The default for Work Queue is 9123, so we will use that here.
# We can set Work Queue to find a free port for use, and we will explore
# that later in the notebook.
port = 9123
executor = WorkQueueExecutor(port=port)
runner = Runner(executor)

## Processing one chunk of events at a time

In Coffea, the iterative executor processes one set of events at a time. We can emulate this by directing the factory that creates workers to only create one worker. The factory automatically creates workers as it sees work available up to `max_workers`.

In [None]:
# Create a factory that looks for a running Coffea in the local machine at the
# port we defined above.
workers = Factory("local", manager_host_port = f"localhost:{port}")

# Tell the factory that it should not create more than one worker...
workers.max_workers = 1

# ...and that the workers should be requested with 1 core
# If we do not set resource limits, a worker automatically configures to
# use all available in a machine.
# Since we are using "local", workers are created in the local machine
# and trust any resource specifications make sense.
# For other batch systems, e.g., condor, these resource specifications
# are used in the submit file.
#workers.memory = 32000 # in MB
#workers.disk = 32000  # in MB
workers.cores = 1

In [None]:
# Finally, we execute the Coffea application. By using the factory inside a 
# "with" statement, the factory shutdowns the workers when the workflow finishes.

with workers:
    hists = runner(filelist, "Events", MyProcessor())

<div class="alert alert-success"> Note how all of the resources of the worker (seen in "largest seen") are allocated (seen in "current allocation") to a task. Also note that the tasks are using little of the memory and disk allocated. </div>

## Manually declaring resources

As we saw above, each task (that is, each processed chunk) used at most 1 core, and only a small portion of the memory and disk available. Thus, if our worker has more than 1 core, it could run more than one task concurrently. This can be achieved by explicitly specifying how many resources a task should use. Note how Work Queue automatically allocates to the tasks a corresponding proportion for the resources that we did not specify, (i.e. memory and disk):

In [None]:
executor = WorkQueueExecutor(port=port, cores=1)
runner = Runner(executor)

workers = Factory("local", manager_host_port = f"localhost:{port}")

workers.max_workers = 1

# Declare that each local worker has 2 cores
# (just for testing, probably not true)
workers.cores = 2
with workers:
    hists = runner(filelist, "Events", MyProcessor())

<div class="alert alert-success"> Note how each task gets allocated (current allocation) 1 core, and that the rest of the resources of the worker (seen in "largest seen") are divided among the cores. </div>

## Automatic resource management

The previous example worked because we knew that each task did not use more than one core. Since we do not always know how many resources a function will need, the Work Queue executor automatically adjust its allocations according to previous values measured. Preprocessing, processing, and accumulating tasks are handled separately

In fact, this is the default mode for the executor, only that we did not observe this in the previous examples because given the default Coffea chunksize (100k events), there were not enough tasks for Work Queue to figure out an allocation. In the absence of data, Work Queue allocates a whole worker to a task. Thus, for the automatic resource management example, we set the chunksize to a small value to create tons of tasks. Later in this notebook we will show how Work Queue can also adapt the chunksize to better use the resource allocations. 

In the output below, note how the `current allocation` of processing tasks eventually adapts to 1 core, even though there is no mention of cores when declaring the executor.

Note that the automatic resource management depends on measuring the resources used by Coffea tasks, and this only works on Linux.

In [None]:
executor = WorkQueueExecutor(port=port)
runner = Runner(executor, chunksize=25000)

workers = Factory("local", manager_host_port = f"localhost:{port}")
workers.max_workers = 1
workers.cores = 2
with workers:
    hists = runner(filelist, "Events", MyProcessor())

<div class="alert alert-success"> Note how the values for current allocation adapt to observed resources values. Once that happens, several processing tasks run concurrently in the worker. Also note how resources are divided uniformly according to values observed. <br>(For accumulating tasks, there are not enough tasks to gather information.) </div>

### Other resource management modes

#### Mixing explicitly declared resources and automatic modes

If resources are explicitly declared, such as `WorkQueueExecutor(..., cores=4, memory=4096, disk=8192)`, then Work Queue uses the values given as the maximum allocation a task can use. Otherwise, the maximum allocation is to use a whole worker.

#### Maximum throughput mode

The default resource management mode is to make new resource allocations using the maximum resources seen. This mode works nicely for most Coffea situations where processing functions use the same amount of cores. However, concurrently may be greatly reduced if there are outliers than use more resources than the rest of the tasks. For such situations, the executor can be directed to use its maximum throughput setting, where Work Queue optimizes the number of tasks done per second, at the expense of retrying outliers using whole workers. For processing tasks that use about the same amount of resources, such as it is often seen in Coffea, both the default and the maximum throughput mode produce the same allocations.

```python
executor = WorkQueueExecutor(port=port, resources_mode="max-throughput")
```

Note that the maximum throughput mode, if active, only applies to processing tasks. Accumulating tasks always use the maximum seen allocations because they tend to grow in their resource usage as the run progresses.

#### Fixed mode

With the fixed mode, the executor does not adapt any of the allocations. If no resource is declared, then tasks use whole workers. Otherwise, undeclared resources are divided proportionally among the resources that were declared.

```python
executor = WorkQueueExecutor(port=port,
                             resources_mode="fixed",
                             cores=1,
                             memory=4096,
                             disk=8192)
```

If you want Work Queue to simply manage the resources, but not to enforce them, the resource monitor can be turned off:

```python
executor = WorkQueueExecutor(port=port,
                             resources_mode="fixed",
                             resource_monitor="off",
                             cores=1,
                             memory=4096,
                             disk=8192)
```




## When task are to big for the workers

#### Tasks whose declared resources don't fit current resources

If resources are declared, as in `WorkQueueExecutor(..., memory=64000, ...)`, but not connected worker is large enough, then Work Queue **will wait until a larger worker connects.**

#### Tasks that during runtime don't fit whole workers

Similar to the previous case, when no explicit resources are declared, and the observed resources are larger than the currently connected workers, then Work Queue waits for larger workers to connect.

In [None]:
%%script echo skipping

workers = Factory("local", manager_host_port = f"localhost:{port}")
workers.memory = 4000
workers.disk = 10000

executor = WorkQueueExecutor(port=port, cores=1, memory=8000)
runner = Runner(executor)

with workers:
    hists = runner(filelist, "Events", MyProcessor())

<div class="alert alert-danger"> The above cell will block the notebook. You will need to feed a large enough worker from the terminal. Below we will show how to do that. </div>

### When tasks are too big for their explicit allocations

For preprocessing and accumulation tasks, if they exhaust explicitly set limits then they **fail permanently**, and therefore the whole run also fails. This is because these cannot be modified so that they use less resources.

Processing tasks are handled differently, and are the topic of the next section.

<div class="alert alert-warning"> If using mybinder, examples from here on may not fit in memory.
This will cause your notebook to disconnect. </div>

## Adapting the number of events per processing task (i.e., the chunksize)

### Automatic splits

When a processing task exhausts its explicitly allocated resources, the set of events is divided and new processing tasks are created. This process is repeated as necessary, until the maximum number of retries is reach (default is 5), or when the chunksize can't be divided anymore.

In general, more than a handful of splits is indicative that the desired chunksize is too big, and it should be reduced for further runs.

In [None]:
# To test adapting the chunksize, we use a modified NanoTestProcessor.py,
# that artificially increases the memory used as a function of the number
# of events in the task.

from MemNanoTestProcessor import MemNanoTestProcessor as MyProcessor

In [None]:
# Also, since this processor is not part of coffea, we need to tell
# Work Queue to make it available at the workers. This is convenient,
# as we can quickly modify the processor without the need to re-install
# or re-build the coffea environments at the remote sites.

executor = WorkQueueExecutor(
    port=port,
    memory=400,
    extra_input_files=["MemNanoTestProcessor.py"])

# set a chunksize to ensure that tasks fail at first
runner = Runner(executor, chunksize=40000) 

In [None]:
workers = Factory("local", manager_host_port = f"localhost:{port}")
workers.max_workers = 1
workers.cores = 2
workers.memory = 800
workers.disk = 10000

with workers:
    hists = runner(filelist, "Events", MyProcessor())

<div class="alert alert-success"> Note how the min chunksize goes down, and task exhausted events goes up. </div>

## Dynamic chunksize computation

The Work Queue executor can also modify the chunksize to a desired memory target usage. In the following example we start with a small chunksize, which grows to fill the desired allocations.

In [None]:
target_memory = 500  # 0.5GB

executor = WorkQueueExecutor(
    port=port,
    extra_input_files=["MemNanoTestProcessor.py"],
    memory=target_memory,)                          # do not use more than the
                                                    # target memory
                       

# set a small chunksize, to create tons of tasks from the single example file.
runner = Runner(
    executor, 
    chunksize=2048,
    dynamic_chunksize={"memory": target_memory})


workers = Factory("local", manager_host_port = f"localhost:{port}")
workers.max_workers = 1
workers.cores = 2
workers.memory = 1000
workers.disk = 10000

with workers:
    hists = runner(filelist, "Events", MyProcessor())

<div class="alert alert-success"> Note how the current chunksize and the largest seen memory values goes up. </div>

## Using remote workers

### Using a catalog of names

All the examples above assumed that Coffea and the workers were running on the same machine, and that we knew which port workers should use to connect to the Work Queue executor. In general, keeping track of addresses and ports is not very practical, but we can use a service to match workers and applications. This service is called the **catalog server**, and by default points to server running at the University of Notre Dame. (You can set catalog servers for yourself.)

<img src="https://github.com/cooperative-computing-lab/coffea-wq-notebook/blob/master/images/coffea-wq-general-catalog.png?raw=1" title="Coffea Work Queue basic architecture" style="display:block; margin:auto;"/>



To use the catalog server, we need to give a name to the application, pass this name to the factory, and create a password file:

In [None]:
with open("mypassword.txt", "w") as f:
    f.write("mysecretepassword")
    
executor = WorkQueueExecutor(
    master_name="hal",
    password_file="mypassword.txt",
    extra_input_files=["MemNanoTestProcessor.py"])
                       
runner = Runner(executor, chunksize=32768)

In [None]:
#workers = Factory("condor", "hal")
workers = Factory("local", "hal")
workers.password = "mypassword.txt"
workers.max_workers = 1

# with Factory("condor", ...) or other batch systems
# the worker resource specification become part
# of the batch job specification
workers.cores = 2
workers.memory = 800
workers.disk = 10000

with workers:
    hists = runner(filelist, "Events", MyProcessor())

### Create an environment

In all the examples above we used workers that were running on the same machine as the Coffea application (i.e., we used `Factory("local", ...)`). When a batch system is available, we can direct the factory to launch workers that will run in remote machines, for example, instead of `"local"`, the factory accepts `"condor"`, `"slurm"`, and others.

This poses a problem, as it is likely that the remote machines will not have the python environment that is needed for the tasks. The Work Queue executor has the ability to send a python environment together with the tasks, and set it up at run time accordingly. Workers have cache the environment, so that they have to be set only once per Coffea application. 

To environments that the Work Queue executor expects are files created by the `conda-pack` tool. Work Queue provides a tool, `poncho_package_create` that streamlines the environment creation:

In [None]:
# create the environment specification
import sys
import json


spec = {  "conda": {} }
spec["conda"]["channels"] = ["conda-forge"]


# match the python version to be installed with the current one
py_ver = f"{sys.version_info[0]}.{sys.version_info[1]}"

spec["conda"]["packages"] = ["coffea=0.7.18", "ndcctools=7.4.14", "xrootd", f"python={py_ver}"]

# any pip packages installed with either pip install . or pip install -e .
# are given preference and correctly included
spec["conda"]["pip"] = []


my_env_spec_file = "my_env_spec.json"
with open(my_env_spec_file, "w") as f:
    json.dump(spec, f, indent=4)

In [None]:
with open(my_env_spec_file) as f:
    print(f.read())

In [None]:
%%bash
# shell command

# create env file, but only if it is not there
if [[ ! -f myenv_to_send_away_file.tar.gz ]]
then
   # this may take a handful of minutes
   poncho_package_create my_env_spec.json myenv_to_send_away_file.tar.gz
fi

In [None]:
# Finally, we tell the executor about the environment file we just created:

executor = WorkQueueExecutor(
    #port=(9123,9130),
    master_name="hal",
    environment_file="myenv_to_send_away_file.tar.gz",
    password_file="mypassword.txt",
    extra_input_files=["MemNanoTestProcessor.py"])
                       
runner = Runner(executor, chunksize=32768)

# we can also tell the workers about the environment
# this is convenient if reusing workers across workflows
# workers.python_package = "myenv_to_send_away_file.tar.gz"

with workers:
    hists = runner(filelist, "Events", MyProcessor())


### The factory outside a notebook

Often the workers have to be launched in a machine that is different from the one running the Coffea application. In such cases we cannot use the `with workers:` statement. However, we can still use the factory a shell command. The command takes as an input a configuration file which is monitored for changes. This makes easy to increase the size and number of new workers. 

In [None]:
factory_conf = """
{
    "manager_name": "hal",
    "max_workers=250,
    "min_workers=0,
    "cores": 4
    "memory": cores * 1024,
    "disk": cores * 2048
}
"""

with open("factory.jx", "w") as f:
    f.write(factory_conf)

In [None]:
%%script echo skipping

# in a terminal
work_queue_factory -P mypassword.txt -T condor -C factory.jx --python-package myenv_to_send_away_file.tar.gz

# If adding python-package here for the environment, you don't need to add the environment_file parameter to
# the executor.