<a href="https://colab.research.google.com/github/casangi/cngi_prototype/blob/master/docs/benchmark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Benchmarks

As the package matures, applications designed to measure a suite of agreed-upon benchmarks (such as [airspeed velocity](https://asv.readthedocs.io/en/stable/)) may eventually prove useful. Until then, performance benchmarking will be carried out on an ad hoc basis.

Other open source projects that rely on similar framework components have produced their own benchmarking efforts, which can serve as a useful reference. One example is [Pangeo](https://github.com/pangeo-data/benchmarking), who as of July 2020 have managed deployments on [multiple HPC environments](https://pangeo.io/deployments.html#high-performance-computing-deployments) with processing capability on the order of 100+ TeraFLOP/S.

## Single-machine

### Dataset and Software
An ALMA Band 9/10 Chemical Survey of NGC 6334I dataset was chosen to benchmark the ngCASA (version 0.0.9) dirty imaging implementation against [CASA release 5.6.1-8.el7](https://casa.nrao.edu/download/distro/casa-pipeline/release/el7/casa-pipeline-release-5.6.1-8.el7.tar.gz) and the cube refactor in CAS-9386 (https://open-bamboo.nrao.edu/browse/CASA-C6DPT10-57/artifact/shared/tarfile/casa-CAS-9386-73.tar.xz). 

The data can be obtained from the [ALMA science archive](https://almascience.nrao.edu/asax/) typing `uid://A002/Xcb8a93/Xc096` in the ASDM uid column search. 

To convert the dataset from the archival asdm format to a measurement set (ms) use the ```importasdm``` task in CASA.
```python
importasdm(asdm="uid___A002_Xcb8a93_Xc096.asdm.sdm",vis="uid___A002_Xcb8a93_Xc096.ms")
```
Currently cngi-prototype (version 0.0.53) does not have the functionality to convert to the LSRK reference frame (during CASA imaging the frequency reference frame is changed to LSRK). Therefore, to ensure the same compute is done during benchmarking the dataset is converted to LSRK a priori using the ```importasdm``` task in CASA.
```python
mstransform(vis="uid___A002_Xcb8a93_Xc096.ms",outputvis="combined_spw_uid___A002_Xcb8a93_Xc096.ms",spw="33,35,37,39,41,43,45,47",combinespws=True,regridms=True,nchan=7680,outframe="LSRK")
```
The spectral windows are also combined to produce a single ddi. The resulting MS has 

- **Rows** : 706146
- **Channels** : 7680
- **Polarizations** : 2
- **Uncompressed Size of Visibilities** : 173.54 GB

To convert the ms to a vis.zarr file used by ngCASA the cngi-prototype (version 0.0.53) function ```convert_ms``` is used
```python
convert_ms(infile="combined_spw_uid___A002_Xcb8a93_Xc096_v2.ms", chunk_shape=(23, 903, 10, 2))
```

- **Visibility Data Dimensions (time,baseline,chan,pol)** : 782, 903, 7680, 2 
- **Zarr Chunk shape (time,baseline,chan,pol)** : 23, 903, 10, 2 
- **Uncompressed Size of a Zarr Chunk** : 6.65 MB

An uncompressed chunk size on disk of 6.65 MB was chosen to adhere to the guidelines given in the [zarr tutorial](https://zarr.readthedocs.io/en/stable/tutorial.html#chunk-optimizations). Multiplying the time and baseline dimensions yields the number of rows in the MS. This will not always be the case, as the number of observing antennas can change during an observation. The ```convert_ms``` function replaces the missing values with ```np.nan```.







### Hardware Setup

The node cvpost020 was reserved on the cvpost cluster to do the benchmarks. The node has two sockets with Intel E5-2640v3 CPUs (16 cores) and 256 GB ram. The ms and vis.zarr datasets are stored on the CV Lustre file system.  

### Benchmark
The benchmark is the time to create a dirty image cube (500x500 pixels, 7860 channels, 2 polarization, 31.44 GB uncompressed). The available memory will be limited to 64 GB (8 GB per core). Therefore, the data (173.54 GB) to be imaged is larger than the available memory.

#### CASA

A shell script is used to launch CASA with different number of mpi threads.
```sh
#!/bin/sh
export CASA_VERSION=CASA_5.6.1-8
export CASAPATH=/.lustre/cv/users/jsteeb/CASA/casa-pipeline-release-5.6.1-8.el7
#export CASA_VERSION=CAS-9386-73
#export CASAPATH=/.lustre/cv/users/jsteeb/casa-CAS-9386-73
export chanchunks=-1
export parallel=True
for i in {1..17}
do
    xvfb-run -d $CASAPATH/bin/mpicasa -N $i $CASAPATH/bin/casa --nogui --logfile "benchmark_cvcluster_casa_n${i}.log" -c benchmark_cvcluster_casa.py
done
```
The CASA benchmarking python script
```python
import os
casalog.filter('INFO3')
vis_name = '/.lustre/cv/users/jsteeb/NGCASA/data/combined_spw_uid___A002_Xcb8a93_Xc096.ms'
niter = 0
gridder = 'standard'
imsize = [500,500]
cell = ['0.02arcsec']
specmode = 'cube'
weighting='natural'
chanchunks = int(os.getenv('chanchunks'))
parallel = True if os.getenv('parallel') == 'True' else False 

    
imagename = '/.lustre/cv/users/jsteeb/NGCASA/temp'
os.system('rm -rf '+imagename+'*')
os.system('rm -rf /.lustre/cv/users/jsteeb/NGCASA/TempLattice*')
tclean(vis=vis_name, imagename=imagename, imsize=imsize, cell=cell, stokes='XXYY', specmode=specmode, gridder=gridder, weighting=weighting, niter=niter, chanchunks=chanchunks, parallel=parallel)
```
Chanchunks is set to -1 so that CASA will automatically calculate the number of chunks needed to not exceed the memory limit. To control memory the following line is added to the ~/.casarc file

```python
system.resources.memory: 64000
```

#### ngCASA

To avoid thread collisions, when using the Dask.distributed Client, set the following environment variables.
```sh
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1
```

To improve Numba performance on Intel CPUs the Intel short vector math library can be installed
```sh
conda install -c numba icc_rt
```
other tips on improving Numba performance can be found here [here](https://numba.pydata.org/numba-doc/latest/user/performance-tips.html).

Data compression is done with the [Numcodecs](https://numcodecs.readthedocs.io/en/stable/) package (used by Zarr). Since the E5-2640v3 CPU is capable of AVX2 instructions Numcodecs was installed using ``` pip install numcodecs``` (installing with conda does not support this). 

The ngCASA benchmarking python script 
```python
if __name__ == '__main__':
    import os
    import xarray as xr
    from dask.distributed import Client
    import dask.array  as da
    from ngcasa.imaging import make_imaging_weight, calc_image_cell_size, make_image
    from cngi.vis import applyflags
    import zarr
    import time
    
    local_store_file = 'data/combined_spw_uid___A002_Xcb8a93_Xc096.vis.zarr/0'
    vis_dataset = xr.open_zarr(store=local_store_file, chunks={'time':782,'chan':40}, consolidated=True, overwrite_encoded_chunks = True)

    #Flag data
    vis_dataset = applyflags(vis_dataset, flags=['FLAG', 'FLAG_ROW'])
 
    #Make imaging weights
    storage_parms = {}
    storage_parms['to_disk'] = False 
    
    imaging_weight_parms = {}
    imaging_weight_parms['weighting'] = 'natural' 
    imaging_weight_parms['chan_mode'] = 'cube' 
    vis_dataset = make_imaging_weight(vis_dataset, imaging_weight_parms, storage_parms)
    
    #Make dirty image
    grid_parms = {}
    grid_parms['chan_mode'] = 'cube'
    grid_parms['imsize'] =  [500,500]
    grid_parms['cell'] = [0.02, 0.02]
    grid_parms['oversampling'] = 100
    grid_parms['support'] = 7
    grid_parms['fft_padding'] =  1.2
    
    storage_parms['to_disk'] = True
    storage_parms['outfile'] = 'data/cube_image_A002_Xcb8a93_Xc096.img.zarr' 
    
    max_threads = 17
    memory_limit = '64GB'
    
    for i in range(max_threads):
        bench_file = open('combined_spw_uid___A002_Xcb8a93_Xc096.txt','a')
        threads_per_worker = i + 1
        print('Threads per worker',threads_per_worker)
        n_worker = 1
        client = Client(n_workers=n_worker, threads_per_worker=threads_per_worker, memory_limit=memory_limit)
        print(client.scheduler_info()['services'])
        
        os.system("rm -fr " + storage_parms['outfile'])
        start = time.time()
        img_dataset = make_image(vis_dataset,grid_parms,storage_parms)
        time_to_calc_and_store = time.time() - start
        client.close()
         
        print('Time to create and store cube image',time_to_calc_and_store)
        bench_file.write(" %d %d %f \r\n" % (n_worker,threads_per_worker,time_to_calc_and_store))
        bench_file.close()
```
Parallelization is done by using one worker with multiple threads since only one node is used and memory is shared. If additional nodes are added the number of workers should be increased by the same number.

The zarr chunks are 6.65 MB (time:23, baseline:903, chan:10, pol:2) in size which is to small to provide meaningful work for a dask thread. Therefore, the dask chunk size is increased to 903.87 MB (time:782, baseline:903, chan:40, pol:2). Note that chunking is maintained on the channel axis since the channels are imaged independently. 


### Benchmark Results

#### Timing 
The ngCASA implementation outperformed both CASA and the cube refactor with the larger than memory benchmark. The Fortran gridding code in CASA is slightly more efficient than the numba just in time compiled python code in ngCASA. However,  ngCASA more efficiently handles the chunked data and does not have intermediate steps where data is written to disk (CASA generates TempLattice files to store intermediate data on disk). For 16 threads/processes ngCASA had a speed up of 1.83 times that of the cube refactor and 4.00 times that of CASA. The cube refactor code was only tested for mpi processes 8 to 16. 

![bench_time](https://raw.githubusercontent.com/casangi/cngi_prototype/master/docs/images/bench_time.png)

#### Memory and CPU Monitoring
A shell command was used to recorded the resident set size (rss) and the percentage cpu utilization every 50 seconds. The resident set size is the portion of memory occupied by a process that is held in main memory (https://en.wikipedia.org/wiki/Resident_set_size).

```sh
logpid() { while sleep 50; do  ps -e -o rss -o pcpu,command | grep python | grep -v grep ; echo xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx; done; }
logpid | tee /.lustre/cv/users/jsteeb/NGCASA/ngcasa.log
```
In the figure below the memory usage is given for all the runs (note that the cube refactor is only for 8-16 mpi processes). As the time axis increases so do the number of threads/processes. Both CASA and ngCASA kept memory usage below 30 GB while the cube refactor memory usage spiked to just below 50 GB. Not only did the cube refactor successfully use more memory it also used the most of the available processing power, peaking at 1554\%.

![bench_top](https://raw.githubusercontent.com/casangi/cngi_prototype/master/docs/images/bench_top.png)

Further improvements to ngCASA implementation are being explored such as changing the Dask chunking and memory management (https://distributed.dask.org/en/latest/worker.html). The ```~/.config/dask/distributed.yaml``` can be edited to allow Dask to use more of the available memory, the default file contains:
```python
distributed:
  worker:
    # Fractions of worker memory at which we take action to avoid memory blowup
    # Set any of the lower three values to False to turn off the behavior entirely
    memory:
      target: 0.60  # target fraction to stay below
      spill: 0.70  # fraction at which we spill to disk
      pause: 0.80  # fraction at which we pause worker threads
      terminate: 0.95  # fraction at which we terminate the worker
```

## Multi-node commercial cloud

The previous benchmark compares the strong scaling of ngCASA with existing CASA in the case of non-interactive operation confined to a single server. This next benchmark attempts to horizontally scale out the measurement by keeping the problem size fixed and adding nodes.

### Data and software

We don't need to download or convert any data if we use what's been previously converted and made available for download. We're going to accomplish that through the use of Simple Storage Service ([S3](https://aws.amazon.com/s3/)) provided by Amazon Web Services. This service provides a means of uploading and storing arbitrary data in the form of objects, accessible via web interface. `boto3` is the SDK provided by AWS to manage authentication and object access/manipulation, but configuration settings allow for public access.

```python
import s3fs
s3 = s3fs.S3FileSystem(anon=True, requester_pays=False)
```

This library allows us to register an object bucket in S3 with our python process as if it were a file system. This can be convenient for certain operations, and will allow us to treat the remotely-stored object data equivalent to a locally-mounted block storage volume. There is a performance overhead associated with the network travel time of the data required to perform the computation, from S3 servers to the `dask.distributed` cluster for processing, but this can be mitigated by choosing network-optimized instance types designed to minimize bandwidth limitations between AWS elastic compute cloud and S3.

There are some smaller visibility data files available in our public bucket, each of which have been chunked along multiple dimensions for comparison against one another. These can be explored simply:

```python
# Specify AWS S3 path
bucket = 'cngi-prototype-test-data/'

# Substitute any of the following
#ous, ddi = 'uid___A002_Xcad526_X4b6e', '27/'
#ous, ddi = 'uid___A002_Xcbc47c_Xe556', '5/'
ous, ddi = 'uid___A002_Xcfc232_X2eda', '16/'

dimension = 'chan'
#dimension = 'time'

exists = s3.isdir(bucket+ous+'_'+dimension+'_chunks.vis.zarr/global/')
print(f"It is {exists} that these data can be accessed like files")

contents = s3.listdir(bucket+ous+'_'+dimension+'_chunks.vis.zarr/'+ddi)
print(f"Here is an example object: {contents[0]}")
```

In addition to these small object groups, the same LSRK-converted ALMA Band 9/10 observations of NGC 6334I (ASDM `uid://A002/Xcb8a93/Xc096`) used in the previous single-machine benchmark were uploaded to S3. In cloud storage, this amounts to 108307 Objects totaling 80.2 GB compressed data. These are the data to be used for the multi-node commercial cloud benchmark, enabling a consistent comparison with the single-machine case.


### Hardware Setup

Various APIs are available for coordinating access to a `dask.distributed` scheduler and workers running in the cloud.

* [Kubernetes](https://docs.dask.org/en/latest/setup/kubernetes.html) and [Helm](https://docs.dask.org/en/latest/setup/kubernetes-helm.html)
  * These Cloud Native Computing Foundation tools are widely used to manage containerized application software deployed to commercial and on-premises clusters of computing resources.
* [Jupyter extension](https://github.com/dask/dask-labextension)
  * For users with institutional access to a managed Jupyter notebook service running in the cloud or on-premises, distributed clusters can be created and destroyed directly from the Jupyter dashboard.
* [dask-jobqueue](https://jobqueue.dask.org/en/latest/)
  * Users with direct access to HPC clusters can use this API to interface directly with cluster resource schedulers, including SLURM, PBS, and HTCondor. This tool has been tested informally using NRAO's on-premises cvpost cluster and successfully coordinated demo computation using 960 CPUs (all available at the time). 
* [dask-gateway](https://gateway.dask.org/)
  * A tool created to centralize control of access to HPC job queueing, compatible with the previously listed approaches.
* [dask-cloudprovider](https://cloudprovider.dask.org/en/latest/)
  * Extension of the distributed client to provide native integration with commerical clouds using docker containers to specify worker images.

#### Kubernetes/Helm

This method can dynamically create an ephemeral deployment for interactive use and quick adaptation. This approach is more useful for prototyping than depending on a centralized system (that would take longer to configure and potentially introduce unneccessary complications).

After configuring AWS CLI with SSH keys and other prerequisites, following [the documentation](https://kubernetes.io/docs/setup/production-environment/tools/kops/) to set up a Kubernetes (k8s) cluster using the `kops` command line tool, here is how to create a cluster with 84 total threads across 3 nodes with four 7-vCPU worker pods each (30GB RAM total, 3.75GB per vCPU).

```bash
export KOPS_CLUSTER_NAME=zinc.k8s.local
export KOPS_STATE_STORE=s3://cngi-kops-state
kops create cluster --name=${KOPS_CLUSTER_NAME} --node-count=3 --node-size=m5dn.8xlarge --master-size=m5dn.xlarge --zones=us-east-1a

# enable access, with an environment variable masking the path to the public key in this case
kops create secret --name zinc.k8s.local sshpublickey admin -i $REDACTED

kops get cluster
NAME		CLOUD	ZONES
zinc.k8s.local	aws	us-east-1a
```
We want to additionally configure the k8s cluster, which we do using commands of the form:
```bash
kops edit ig --name=$KOPS_CLUSTER_NAME master-us-east-1a
kops edit ig --name=$KOPS_CLUSTER_NAME nodes
```
These open the instance group configuration files in an editor that allows reconfiguration, such as adding keys in the form of [CloudLabels](https://github.com/kubernetes/kops/blob/master/docs/labels.md) that help track expenditure, and changing the instance [image specification](https://github.com/kubernetes/kops/blob/master/docs/operations/images.md), e.g.,
```
spec:
  cloudLabels:
    budget: cngi
  image: 099720109477/ubuntu/images/hvm-ssd/ubuntu-focal-20.04-amd64-server-20200423
```
These are the same configuration files to edit when downsizing a cluster (change `maxSize` and `minSize`), switching to preemptible instances, or changing autoscaling behavior. Applying the changes modifies the AWS resources via kubernetes:
```
kops update cluster --name=${KOPS_CLUSTER_NAME} --yes
```
Much larger and more sophisticated configurations are routinely used to support the operations of large organizations, but it only takes a minute to spawn this relatively small amount of computing resources confined to a single Elastic Compute Cloud service region:
```
(base) -bash-4.2$ kops validate cluster --wait 10m
Validating cluster zinc.k8s.local

INSTANCE GROUPS
NAME			ROLE	MACHINETYPE	MIN	MAX	SUBNETS
master-us-east-1a	Master	m5dn.xlarge	1	1	us-east-1a
nodes			Node	m5dn.8xlarge	3	3	us-east-1a

NODE STATUS
NAME				ROLE	READY
ip-172-20-32-211.ec2.internal	node	True
ip-172-20-36-81.ec2.internal	master	True
ip-172-20-44-163.ec2.internal	node	True
ip-172-20-48-130.ec2.internal	node	True

Your cluster zinc.k8s.local is ready
```

The real advantage of using kubernetes to provision cloud resources is the ease with which it is possible to deploy application software using Helm. The Dask project maintains a public [chart](https://helm.sh/docs/topics/charts/) accessible through a public repository known as [the hub](https://https://hub.helm.sh/).
```
helm version
version.BuildInfo{Version:"v3.2.1", GitCommit:"fe51cd1e31e6a202cba7dead9552a6d418ded79a", GitTreeState:"clean", GoVersion:"go1.13.10"}

helm search hub dask
URL                                 	CHART VERSION	APP VERSION	DESCRIPTION                                       
https://hub.helm.sh/charts/dask/dask	4.1.12       	2.20.0     	Distributed computation in Python with task sch...
```
Installing the standard deployment of the Dask helm chart is as simple as giving it a name (in this case simply "dask" and specifying the chart:
```
helm install dask dask/dask
```
Then it is possible to monitor the status of the deployment. In short order all of the nodes in the cluster will contain pods running exposed services (including a central scheduler that manages the distribution of graph computation onto the worker processes).
```
kubectl get pods
NAME                             READY   STATUS              RESTARTS   AGE
dask-jupyter-7d685fdcc4-k8tzf    0/1     ContainerCreating   0          32s
dask-scheduler-756dff69d-sxr6f   1/1     Running             0          32s
dask-worker-6fcb84bbc9-g4jm5     0/1     ContainerCreating   0          32s
dask-worker-6fcb84bbc9-gthjb     1/1     Running             0          32s
dask-worker-6fcb84bbc9-jdk7l     1/1     Running             0          32s

kubectl get services
NAME             TYPE           CLUSTER-IP       EXTERNAL-IP                                                               PORT(S)                       AGE
dask-jupyter     LoadBalancer   100.69.110.209   aec1800a169b849c1b14a4a2a3794bfd-1736373731.us-east-1.elb.amazonaws.com   80:30641/TCP                  2m22s
dask-scheduler   LoadBalancer   100.66.110.37    ab3e4dff0ad0541a9b3786549c66be13-855413684.us-east-1.elb.amazonaws.com    8786:30792/TCP,80:30020/TCP   2m22s
kubernetes       ClusterIP      100.64.0.1       <none>                                                                    443/TCP                       54d
```


At this point we can in principle log into the Jupyter notebook server, perform interactive computation, and access the `dask.distribtuted` client dashboard to monitor the progress of our tasks. However, the ngCASA application software is not a part of the default Dask helm chart, so users will see ImportErrors when they try to load the modules.

One potential solution is that the docker container published by Dask allows for minor customizations by handling keyword arguments that append the `apt-get`, `conda install`, and `pip install` calls when the image is built. These can be specified by passing an extra configuration file through the command line arguments when managing the Helm deployment. So if the deployment is modified in this way, it should successfully install the extra python libraries and also divide each node into the right number of pods.
```
cat config.yaml

# m5dn.8xlarge has 32vCPU and 124GB RAM

worker:
  replicas: 12
  resources:
    limits:
        cpu: 7
        memory: 30G
    requests:
        cpu: 7
        memory: 30G
  env:
    - name: EXTRA_APT_PACKAGES
      value: build-essential swig libgfortran4 gcc python3-dev
    - name: EXTRA_CONDA_PACKAGES
      value: numba==0.48.0 icc_rt --channel conda-forge --channel numba
    - name: EXTRA_PIP_PACKAGES
      value: s3fs numpy==1.18.1 cngi-prototype==0.0.53 ngcasa==0.0.9
jupyter:
  enabled: true
  env:
    - name: EXTRA_APT_PACKAGES
      value: build-essential swig libgfortran4 gcc python3-dev
    - name: EXTRA_CONDA_PACKAGES
      value: numba==0.48.0 icc_rt --channel conda-forge --channel numba
    - name: EXTRA_PIP_PACKAGES
      value: s3fs numpy==1.18.1 cngi-prototype==0.0.53 ngcasa==0.0.9
```
The libraries `build-essential`, `swig`, and `libgfortran3` are `apt` dependencies for python wheels that require a C++ compiler such as `casatools`, and not actually used here (numba compiles using LLVM). Additional flags could be optionally accepted by pip in order to ensure that a [distutils error](https://github.com/blockstack/stacks-blockchain/issues/504) doesn't prevent the built packages from installing, and that the installer can discover the NRAO-managed pip server that holds CASA6. This is not necessary for the purposes of this benchmark.

Requesting only 7 vCPU and 30GB memory for each worker pod allows for the container orchestration [resource overhead](https://kubernetes.io/docs/concepts/overview/components/) (cluster orchestration and proxy services requiring on the order of ~mCPU) which must be running on each node in the cluster.  

We can issue this modification to a running cluster and the Helm server will increment the revision and re-deploy the application software. The same commands as previously used (`kubectl get $RESOURCE_NAME`) can monitor the status of this change, but since the configuration file includes commands to install quite a bit of additional software it might take a little longer to build than the default container.
```
helm upgrade dask dask/dask -f config.yaml
```
Looking at the workers page of the dashboard hosted at the scheduler service's external IP, we can actually watch the pods shut down and then cycle back online. Then we can confirm that our operation was successful with `help('modules')` on the running notebook server, or explicitly checking the build logs of the worker and jupyter pods.

##### Issues

One install error is found in the build logs on the dask-worker pods:
```
ERROR: sparse 0.10.0 has requirement numba>=0.49, but you'll have numba 0.48.0 which is incompatible.
```
This should be a minor issue because no major functionality of cngi-prototype or ngcasa depends on sparse (yet).

---

There is a relatively more significant error in the Jupyter build:

We want the notebook server and the worker pods to share a common software environment so that commands can run on the notebook kernel as well as the distributed processes, but that is only half-working until [this pull request](https://github.com/dask/dask-docker/pull/110) is merged.

Theoretically this doesn't seem to be a blocker because we should be able to just specify the external URL of the scheduler (plus default port number 8786) during creation of the `dask.distributed` Client object, and then run the distributed benchmark from anywhere with an internet connection. In practice the server and worker images should be the same so we can run the benchmark on the k8s cluster in order to avoid the additional latency of issuing commands through our network pipe out to AWS.




#### Docker

> at this point you might want to build your own docker image that includes build-essential, and use that as your image - @TomAugspurger

There are perhaps more expedient ways to specify the container environment.

Breaking the `casatools` dependency in CNGI/ngCASA would allow for simpler configuration. Accounting for the casatools dependency (required by `cngi.conversion` as of July 2020) requires some substantial configuration changes over the standard [dask-docker](https://github.com/dask/dask-docker) image.

The CASA team is already exploring the use of Docker to distribute our applications, especially in the context of macOS support and Apple's pending migration to custom ARM processors. Why not get two for the price of one?

Forking dask-docker to the casangi GitHub organization and adding our own customized `/opt/app/environment.yml` to be composed by the Dockerfiles could avoid the complications of modifying someone else's templates (at the cost of assuming the responsibility of managing our own). This would also allow us to change the python version in the `conda install` command to 3.6 for compatibility with `casatools`, include compiler tools in our default images, and make various other modifications as needed.

If we maintained a single consistent development environment stable across time, we could use [repo2docker](https://repo2docker.readthedocs.io/en/latest/howto/export_environment.html) to export the single-machine benchmark environment. However, this is not practical at present due to our rapid iteration during this prototyping experiment.

In order to try and work around the issue preventing the jupyter notebook server and dask worker images from sharing consistent software versions, we tried forking the repository and [changing the Dockerfile](https://github.com/amcnicho/dask-docker/pull/1/files) to get the `numba` (and `casatools`) build dependencies installed correctly. Testing these changes using [Docker Desktop](https://docs.docker.com/get-started/#docker-concepts) against a local clone of the modified dask-docker repository branch:
```
docker build --tag demo_fix:latest ./notebook/
docker run --publish 8888:8888 -e "EXTRA_APT_PACKAGES=build-essential swig libgfortran4 gcc python3-dev" -e "EXTRA_CONDA_PACKAGES=numba==0.48.0 icc_rt --channel conda-forge --channel numba" -e "EXTRA_PIP_PACKAGES=s3fs numpy==1.18.1 cngi-prototype==0.0.53 ngcasa==0.0.9" demo_fix:latest
```
This launches a container and forwards the ports from docker to the local host, where it is possible to access the running Jupyterlab server and verify the change. The fix works! But until the PR is accepted, it's not straightforward to use. Instead of publishing a duplicate Helm chart, we can push this working image to a public repository on the Docker hub and then pull it into the modified Helm configuration by changing the image specified in the configuration file we pass to the install command.

The updated section of the configuration file looks like this:
```
jupyter:
  enabled: true
  image:
    repository: "amcnicho/dask-docker-test"
    tag: dask-notebook
    pullPolicy: Always
```
Instead of `daskdev/dask-notebook`, we are running the jupyter service component of the deployment using the docker image that we published to the hub.

When the rest of the custom configuration is supplied, the result is a working configuration!
```
helm install dask dask/dask -f config.yaml
```
We can verify this by examining the logs of the container build with `kubectl logs $PODNAME` and 

### Benchmark

#### CASA

Running CASA using MPI on more than one node has not been performed during telescope operations, so current scaling limitations are not well characterized. The default configuration for ALMA offline batch processing as of July 2020 is 1 (half) node and 8 processes.

#### ngCASA

For this next demonstration case we will adapt the same benchmark as used previously, but we will run it interactively through the jupyter notebook server.


Importing the following modules successfully to confirm that our configuration is working
```python
from cngi import dio
from dask.distributed import Client, config
config['scheduler_address']
```
Next code cell we can import the modules required for the benchmark
```python
import os, time
import xarray as xr
from cngi.vis import applyflags
from ngcasa.imaging import make_imaging_weight, calc_image_cell_size, make_image
import s3fs, zarr
```
All that should be necessary to connect the jupyter kernel with the worker processes
```
client = Client()
client
```
We can also change the configuration for optimization
```python
client.map(os.system,"export OMP_NUM_THREADS=1")
client.map(os.system,"export MKL_NUM_THREADS=1")
client.map(os.system,"export OPENBLAS_NUM_THREADS=1")
```
Access the dataset from S3 instead of local filesystem
```python
s3 = s3fs.S3FileSystem(anon=True, requester_pays=False)
bucket = 'cngi-prototype-test-data/'
vis_data = 'combined_spw_uid___A002_Xcb8a93_Xc096.vis.zarr/0'
# convert the S3 key:value objects to a MutableMapping
cloud_store_file_vis = s3fs.S3Map(root=bucket+vis_data, s3=s3, check=False)
# pass the mapping as input to xarray's zarr interface
vis_dataset = xr.open_zarr(store=cloud_store_file_vis, chunks={'time':782,'chan':40}, consolidated=True, overwrite_encoded_chunks = True)
flagged_dataset = applyflags(vis_dataset, flags=['FLAG', 'FLAG_ROW'])
```
Set up the test to use the same properties as the single machine case
```python
#Make imaging weights
storage_parms = {'to_disk' : False}
 
imaging_weight_parms = {'weighting' : 'natural',
                        'chan_mode' : 'cube',
                        'imsize':[500,500],
                        'cell' : [0.02, 0.02]
                       }
dataset_with_weights = make_imaging_weight(flagged_dataset, imaging_weight_parms, storage_parms)
```
apart from the S3 reads, we can run essentially the same code as the previous benchmark with the main difference of avoiding the write back to disk after the computation.

We have already configured our k8s cluster with the desired properties of the test (CPU threads, memory limits, and number of worker processes per node).
```python
#Make dirty image
grid_parms = {'chan_mode':'cube',
              'imsize':[500,500],
              'cell' : [0.02, 0.02],
              'oversampling' : 100,
              'support' : 7,
              'fft_padding' :  1.2
             }
start = time.time()
img_dataset = make_image(dataset_with_weights,grid_parms,storage_parms)
img_dataset['DIRTY_IMAGE'].values
print(f"Time to calculate (but not store): {time.time() - start}")
print(f"output size in bytes: {img_dataset.nbytes}")
```

### Benchmark results

#### Timing

##### Fixed problem size

The original benchmark used a fixed setting of 16 threads per worker process. We ran the benchmark notebook using a kubernetes cluster configured with [m5dn.24xlarge](https://aws.amazon.com/ec2/instance-types/m5/) instances (up to five 16-thread worker process pods per node) running [Ubuntu 20.04](https://releases.ubuntu.com/20.04/) server images. The zarr chunks are small -- 6.65 MB uncompressed (`time:23, baseline:903, chan:10, pol:2`) -- so we varied the size and number of dask chunks to keep the processors from starving for work, starting from a size of 903.87 MB (`time:782, baseline:903, chan:40, pol:2`) and decrementing along the time and channel dimensions to keep the number of dask chunks ~3x the number of vCPUs in the cluster.

To minimize total cost of ownership for the AWS resources during the test, we adopted the setting of 18 threads and 72GB of RAM per worker process, for up to five worker pods per instance (leaving 6 vCPU and 24GB for k8s pod management). To resize the cluster for testing at different scales, all we had to do was input the desired replicas in the worker customization of the docker configuration and redeploy using helm.
```
emacs -nw config.yaml
helm upgrade dask dask/dask -f config.yaml
```
The same scheduler and dashboard services remain exposed during this process, demonstrating the capability to dynamically scale operational workloads.

| Seconds | vCPUs | total RAM | block size | # nodes | # Pods | Processes |
|---|:---:|:---:|:---:|:---:|:---:|:---:|
|  | 18 | 72 GB | 903.87 MB | 1 | 1 | 1 |
|  | 54 | 216 GB |  | 1 | 3 | 3 |
|  | 90 | 360 GB |  | 1 | 5 | 5 |
|  | 108 | 432 GB |  | 2 | 6 | 6 |
|  | 144 | 576 GB |  | 2 | 8 | 8 |
|  | 180 | 720 GB |  | 2 | 10 | 10 |
|  | 360 | 1.44 TB |  | 3 | 20 | 20 |
|  | 720 | 2.88 TB |  | 4 | 40 | 40 |
|  | 1080 | 4.32 TB | 6.65 MB | 6 | 60 | 60 |

##### Increased problem size

As a bonus, tests were also conducted using an oversampling of 200 and support of 71. For this simulation of increasingly compute-limited conditions the following results were obtained.

1. 6986 seconds for 84 threads on 12 processes divided between 3 nodes

2. 2533 seconds for 280 threads on 40 processes divided between 10 nodes

3. 1781 for 672 threads on 96 processes divided between 24 nodes

#### Memory and CPU Monitoring

Interactive performance characterization was possible using the optionally-installed [Bokeh](https://docs.bokeh.org/en/latest/index.html) dashboard, which supplied the execution graph and task stream diagrams, in addition to the communication ring plot.

## Analysis

Amdahl's argument