# Demo 3: Scaling up Python workflows using Dask

In this demo we show how analysis workflows can be scaled to multiple CPUs by parallelizing the processing using `dask.distributed` package.

## 1. Creating a Dask cluster

A **Dask cluster** consists of a **scheduler** that can spawn multiple **workers**, to which the workload can be distributed.

### 1.1 Types of Dask clusters
The types of clusters available in the Analysis Facility are:
- `LocalCluster` - uses local CPUs (inside the user's AF session).
- `PurdueSLURMCluster` - custom SLURM cluster that distributes the workload to SLURM batch system on Purdue CMS (Hammer) cluster.

We recommend to use the `SLURM` cluster, as it comes with useful features:
- You can specify a kernel that all workers will use
- SLURM cluster has access to much more CPUs and GPUs, compared to LocalCluster

### 1.2 Ensuring consistent environment between analysis workflow and Dask workers
It is crucial that the Dask workers have access to the software packages used in the analysis. Due to the way Dask operates, this includes not only the packages used in the part of the code executed by workers, but *all* packages used in the workflow.

This includes the following:
- **Python packages**: Dask cluster should be launched with the same kernel (Conda environment) as the analysis workflow (Jupyter notebook or Python script). The kernel (Conda environment) should be visible to the workers, the best way to ensure that is to store it in **Depot**.
- **Local imports** (from other Python files in your analysis framework.): this can be done by *explicitly uploading your code to the workers* (see Section 1.4).

### 1.3 Starting a Dask cluster
- Open Dask extension (click on red logo in the left sidebar)
- Click on [+ NEW] button
- In the dialogue window, select `SLURM cluster` and `Python3 kernel [ML]`
- Select desired number of workers
- Once the cluster is created, copy its IP address and paste below to connect a Dask client to it.

In [None]:
from dask.distributed import Client

client = Client("tcp://10.5.15.122:8786") # paste correct IP address here
# print(client)

### 1.4 Upload your code to workers
The code below shows how to upload the local code to workers. This is needed to ensure that the workers understand imports like `from submodule.event_selection import load_events`.

**Run the following cell, then restart this notebook, reconnect the Dask client to the SLURM cluster, and proceed with other cells.**

In [None]:
from distributed.diagnostics.plugin import UploadDirectory
client.register_worker_plugin(UploadDirectory("./", restart=True, update_path=True))
client.register_worker_plugin(UploadDirectory("submodule", restart=True, update_path=True))

## 2. Load events

Before running this section:
- Create a Dask SLURM cluster in the extension
- Create a client as shown in Section 1.3
- Upload code to workers as shown in Section 1.4
- Restart the kernel
- Run Section 1.3 again to reconnect the client to the cluster

Here we load the pre-selected NanoAOD events using `uproot` (see code in `submodule/event_selection.py`).

In [None]:
import numpy as np
import pandas as pd
import torch

from submodule.event_selection import load_events
from submodule.dnn_model import NeuralNet

sources = ["data", "ttbar", "dy"]
server = "file:/depot/cms/purdue-af/demos/"
model_dir = "/depot/cms/purdue-af/demos/"
dfs = {}

features = ['mu1_pt', 'mu1_eta', 'mu2_pt', 'mu2_eta', 'dimuon_mass', 'met']

# load datasets for inference
for src in sources:
    dfs[src] = load_events(f"{server}/{src}.root")[features]


## 3. Example parallelization
In order to test the distributed processing setup, we run a simple DNN inference for three small datasets in parallel. Here DNN inference is just an example of a processing code that can be parallelized.

In [None]:
# Check if there are any local GPUs available
if torch.cuda.is_available():
    print("Will use GPU for inference.")
else:
    print("Will use CPUs for inference.")


# The main processing function that will be executed in parallel on multiple datasets
def inference(inp):
    label = inp[0]
    df = inp[1]
    model_path=model_dir+"/model.ckpt"
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    model = NeuralNet(6, [16, 8], 1).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    df = torch.from_numpy(df.values).to(device).float()
    scores = model(df) 
    scores = scores.cpu().detach().numpy().ravel()
    
    # Save DNN outputs to a file
    save_path = f"/depot/cms/users/dkondra/dnn_outputs/{label}.npy"
    np.save(save_path, scores, allow_pickle=True)
    return label, scores


print("\nDatasets:", list(dfs.keys()))
# Distribute the datasets to workers
scattered_data = client.scatter(list(dfs.items()))

# Process the datasets in parallel and return the results
futures = client.map(inference, scattered_data)
results = client.gather(futures)

for res in results:
    print(res[0], res[1])

## 4. Plotting outputs
Run this cell after either Dask parallelization example or after Triton example to plot the DNN outputs (note that the models are different in these examples, so the outputs will not look the same). The models are generic and not meant to provide any physics meaning.

In [None]:
import matplotlib.pyplot as plt
bins = np.linspace(0, 1, 100)
plt.figure(figsize=(5,4))

dnn_outputs = {}

for src in sources:
    load_path = f"/depot/cms/users/dkondra/dnn_outputs/{src}.npy"
    dnn_outputs[src] = np.load(load_path)

plt.hist(dnn_outputs["dy"], bins, alpha=0.3, label='dy', density=True)
plt.hist(dnn_outputs["ttbar"], bins, alpha=0.3, label='ttbar', density=True)
plt.hist(dnn_outputs["data"], bins, alpha=0.3, label='data', density=True)
plt.xlabel('DNN Score')
plt.ylabel('Events')
leg = plt.legend(loc='upper left')