# Running Cellprofiler Jobs on HPC clusters using `dask-jobqueue`

Author: _Volker Hilsenstein_ (Monash Micro Imaging)

### Aim:
This Jupyter notebook will guide you step-by-step through the process of running a Cellprofiler pipeline in parallel on a High-Performance-Compute cluster. The python code for the widgets is hidden in seperate `.py` files that are imported. 

Here, we are running on Monash University's M3 _Massive_ cluster, which is running the SLURM scheduler. However,
with minor changes this should also work on clusters running LSF, Moab, SGE or PBS if you change the `from dask_jobqueue import SLURMCluster as Cluster` statement  accordingly (see https://dask-jobqueue.readthedocs.io/en/latest/api.html).


### Instructions:

If you have never used Jupyter notebooks before:
In order to run this, go through the individual code cells (some of them have widgets), make changes if necessary, and execute the cells by pressing Shift+ENTER simultaneously. You will find many tutorials on Jupyter notebooks online.


### Prerequisites:

* It is assumed that you have created a CellProfiler pipeline and that you have 
* It is assumed that Cellprofiler is installed on your cluster. This is a non-trivial step and you will probably need help of your HPC support team. If your cluster environment supports the Singularity container enviroment, you can find a build recipe for a container running CellProfiler 3.1.5 here: https://github.com/VolkerH/MyDockerStuff/tree/master/SingularityCellprofiler
* You will have to set up a conda environment in your user-space on the cluster and set up an ssh tunnel. I recommend this video (https://www.youtube.com/watch?v=FXsgmwpRExM) by Matt Rocklin for guidance. Note that if you use `autossh` instead of `ssh` for setting up the tunnel, the tunnel will automatically be re-established if you intermittently use your network connection. This is useful if you use a laptop as your local machine, it will allow you to shut down, change WIFI networks and still keep the connection to your Jupyter notebook open. On Windows 10 you can run `autossh` easily if you install the windows subsystem for Linux.

# Import  libraries
just press Shift+Enter and wait until the cell has finished processing.

In [None]:
import re
import pandas as pd
import subprocess
from dask_jobqueue import SLURMCluster as Cluster
from dask import delayed
from dask.distributed import Client, as_completed, progress
from distributed.scheduler import KilledWorker
from tqdm import tqdm
import ipywidgets as widgets
from notebook_widgets import time_per_im, im_per_batch, batchfile, walltime_chooser
from notebook_widgets import resource_chooser, gb_of_RAM, nr_cpus
from notebook_widgets import FileBrowser
from csv_concat import *

# Fixed settings

The following cell defines settings that are unlikely to change between different experiments but may have to be set once for each user (or system)

## `cpexe`

set this to the path of the cellprofiler executable (this might be a script)

In [None]:
cpexe = '/home/vhil0002/mycp'

## Cluster options

The High-Performance-Computing (HPC) environment at your institution may require parameters specific to that particular enviroment, such as queue names, project names etc.

You can set such parameters in the configuration file `.config/dask/jobqueue.yaml` in your home directory (modify the file with a text editor. Refer to your local HPC instructions or support team to determine which parameters you need to set.)
(Also see https://www.youtube.com/watch?v=FXsgmwpRExM from minute 11:20 onwards)

In [None]:
# Just have a look at our dask jobqueue settings
!cat ~/.config/dask/jobqueue.yaml

# Resources (GPU, RAM)

In [None]:
resource_chooser

# Filebrowser, select Batchfile

The following widget provides a file select box. Navigate to the `Batch_data.h5`.
You generate this file by adding the `CreateBatchFiles` module to your Cellprofiler pipeline and running the pipeline once with the module enabled.

In [None]:
batchfile.widget()

In [None]:
batchfile.path

# Select batchsize and approximate processing time per image (for Walltime)

The batchsize (images per batch) is the number of image set to be processed by each cluster job.

The `walltime` is the time that a job gets allocated on the cluster. After the `walltime` has elapsed the job gets killed, no matter whether the Cellprofiler process has finished or not. A short `walltime` means that it will be easier to find the resources to squeeze a job in. You should try to set the `walltime` to the time that is needed for processing plus a little buffer time  in case the processing takes longer. Too large a value for walltime will mean that your jobs will take longer to be allocated.

With the following sliders you can select the batchsize and your approximate Cellprofiler runtime estimate to process one image set. (To obtain such an estimate, run a few image sets in test mode, but note that processing times may differ for different image sets, e.g. due to different numbers of objects. So round up your estimate to allow for a buffer).

The `walltime` will be calculated as the number of images per batch times the estimated processing time per image set. 

In [None]:
walltime_chooser

In [None]:
walltime = time_per_im.value * im_per_batch.value
print("Chosen walltime is", walltime, "minutes.")
print("If you want to change this value, just assign to the variable walltime")

# Generate Batchfiles and list of commands

Cellprofiler can generate its own list of batch commands when called with the options

* `--get-batch-commands` followed by the path to the `Batchcommands.h5` file
* `--images-per-batch` followed by the number of images that each seperate job should process.

We run Cellprofiler with these options to create the list of shell commands. 
This may take a moment.

In [None]:
cmd = [cpexe, '--get-batch-commands='+batchfile.path, '--images-per-batch='+str(im_per_batch.value)]
print("Running  "+" ".join(cmd)+ " to generate batch commands")
batchcmds = subprocess.check_output(cmd)

# Substitute executable path
As the commands generated by Cellprofiler assume that the Cellprofiler binary is called `CellProfiler`, we substitute this part of the command with the path to our Cellprofiler executable that is stored in the variable `cpexe`.

In [None]:
tmp = batchcmds.decode("utf-8").split("\n")
# Some of the output lines should be ignored
tmp = filter(lambda l: l.startswith("CellProfiler"), tmp)
# set correct executable
cmds = [re.sub(r"^CellProfiler", cpexe,l) for l in tmp]
cmds = [c.split(' ') for c in cmds]

# Now create a cluster of workers

In [None]:
#cluster=Cluster(cores=1, memory=str(gb_of_RAM.value)+"GB", projects='su62', walltime=str(walltime)+":00")
cluster=Cluster(cores=1, memory=str(gb_of_RAM.value)+"GB", walltime=str(walltime)+":00", job_cpu=nr_cpus.value)

In [None]:
cluster.close()
print(cluster.job_script())

In [None]:
client=Client(cluster)

In [None]:
cluster.scale(len(cmds))

### Dask Dashboard
see: https://github.com/VolkerH/Cellprofiler_HPC_DaskJobqueue/blob/master/Dashboard_via_nbserverproxy.md

# Submit jobs

In [None]:
futures = client.map(lambda c: subprocess.check_output(c,stderr=subprocess.STDOUT), cmds[0:10])
progress(futures)

# Collect output and shut down workers

In [None]:
# collect results (cellprofiler output)
res= []
for future, result in as_completed(futures, with_results=True):
    if future.exception():
        print(future, "produced an exception:", future.exception())
    print(future, "finished")
    res.append(result)
# now that we have all results we can close the cluster
cluster.close()

In [None]:
def print_CP_res(r):
    print(r.decode('utf-8'))

for r in res:
    print_CP_res(r)

# Collect and concatenate `.csv` files

When running on a cluster, you will want to set up your `ExportToSpreadSheet` module in CellProfiler in such a
way as to write a separate `.csv` file for each image set (or for each batch). This can be achieved by selecting output  `Elsewhere` for the output folder and using metadata field in the filename (right-click). 

The reason to save to seperate files is to prevent different cluster jobs trying to access the same files which may lead to prermission errors or possibly overwriting existing results.


### Select the Folder below which the script should search for `.csv` files.

In [None]:
csv_folder = FileBrowser("/scratch/su62/")
csv_folder.widget()

## Below, select the folder where the merged `.csv` files should go

In [None]:
concat_csv_folder = FileBrowser("/scratch/su62/")
concat_csv_folder.widget()

# Do the `.csv` merge

In [None]:
concat_csvs(csv_folder.path, concat_csv_folder.path)

# Remarks

* Some tasks in this pipeline have been broken into several steps for the purpose of guiding you through the proces. In practice, you will probably want to combine several of the cells to automate things further. 
