# On demand interactive Dask based large scale data analysis on Andes

This is an ondemand interactive Dask usage on Andes using Slurm.
The cell below documents the underlying behavior of the Dask cluster.

## Acquiring Dask cluster

### Non-conflicting dask-scheduler instance tied to a notbook

The proposed way to spawn for gears a single Dask cluster per notebook.  Each notebook will have an corresponding Dask scheduler anchored on the node which the notebook is running.
Here, we want to be sure the scheduler won't step on others and would generate random ports for both the scheduler and the dashboard.   In the case of gears, the notebook can run on a login node where others could be in your way.  Even yourself.

The first part of the cell below demonstrates how to do this.

### Spawning the worker pool

After you acquire the cluster and the client for the notebook, you would do a 'cluster.scale(jobs=1)' to actually request a worker pool that will be used for the execution of the subsequent cells.

The recommendation is to cluster.scale(jobs=<up to 4>), do the compute, and then cluster.scale(jobs=0) to remove the cluster to preserve node hours.

Currently, the underlying SLURMCluster object creates one slurm job which is limited to a 1 node allocation of a worker pool as per scale unit.  Scaling up to 4 jobs would mean 4 slurm jobs.  Note that 4 slurm jobs is the limit of the Andes cluster concurrently running.   If more nodes are needed, then each job would need to use job launchers such as 'srun', but unfortunately the current SLURMCluster is not compatible.

In general, assume the notebook can run sequentially from top to bottom and be sure to scale up and scale down explicitely when possible.

If cleanup is not done explicitely, the scheduler will be killed as the notebook's kernel is killed and then there the default 1 hour walltime defined underneath for the on-demand workers.

### Comments on dask-labextension's cluster usage

It is unadvisable to use dask-labextension to spawn a Dask cluster.  One of the aim is to be able to run the jupyter notebooks in a batch environment (cells running sequentially) the development is finished.  Using lab-extension requires copying arbitrary code into jupyter notebooks and would not be reproducable.

In [23]:
# Standard preamble to use the Slurm cluster
import random
from dask_jobqueue import SLURMCluster
from distributed import Client

# Slurm cluster submission to the Andes cluster
# The cluster configuration is in ./etc/dask/dask.yml with sensible defaults
# Refer to the "dask.jobqueue.slurm"
dashboard_port = random.randint(10000,60000)
cluster = SLURMCluster(
    scheduler_options={"dashboard_address": f":{dashboard_port}"}
)
# We print out the address you copy into the dask-labextension
print("Dashboard address for the dask-labextension")
print(f"/proxy/{dashboard_port}")

# Create the client object
client = Client(cluster)
client

Dashboard address for the dask-labextension
/proxy/50031


0,1
Client  Scheduler: tcp://10.43.202.87:41003  Dashboard: http://10.43.202.87:50031/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


# Computation using the cluster

## Initial data load, repartition, indexing & persist

Below is an example of an ephemeral pre-loading step for your "large" dataset to be used in subsequent analysis.  You would need to load only the partitions necessary and would need to repartition to an adequate partition size (i.e., 100MB) and would also need to set_index to be able to acquire the divisions of the partitions.

Once you do this, it would be wise to do a `df = client.persist(df)` in the cluster and use the persisted dask dataframe in the subsequent cells.  Or, it would be also wise to export the reparitioned & indexed dataset to your scratch space (should be GPFS) using `to_parquet`.

Here, you *mostly* don't need to worry about the total DRAM your Dask cluster has.  The gears setup for the Andes cluster uses GPFS (/gpfs/alpine/scratch/<your_id>/.gears/dask/dask-worker-space) for as spill space, and the data you persist will be temporarily stored there.  However, there are cases where a worker can die with its portion of persisted data and the job would spit out errors.  Turns out an expliict `to_parquet` is more stable with a sacrifice of some data loading time.

For data being spilled out to the scratch space, you can visualize its activity by looking into Dask's dashboard (via dask-labextension).  You would see 'orange' colored bars in the "Dask Nbytes" dashboard screen when this spill happens

In [28]:
%%time
import os
import pandas as pd
import fastparquet as fp
import dask.dataframe as dd

# Scale up right before running compute
# Currently, 4 jobs is all you can do in an Andes cluster


# Ensure temporary scratch space for this example
import os
SCRATCH = f"{os.environ['MEMBERWORK']}/gen150/.gears/gears/examples"

DATASET = '/gpfs/alpine/gen150/proj-shared/data/lake.dev/openbmc.summit.raw/openbmc-202004*-*.parquet'
PRECOMPUTE = f"{SCRATCH}/total_power.parquet"
os.makedirs(SCRATCH, exist_ok=True)

# Data preparation
df = None
if not os.access(PRECOMPUTE, os.F_OK):
    # Load, repartition, set_index, only if we don't have it 
    cluster.scale(jobs=4)
    df = dd.read_parquet(
        DATASET, engine='fastparquet', index=False, gather_statistics=False,
        columns=['timestamp', 'total_power'],
    ).repartition(
        partition_size="100MB"
    ).set_index(
        'timestamp'
    ).to_parquet(PRECOMPUTE, engine='fastparquet')
    del df
    cluster.scale(jobs=0)

# Read parquet itself doesn't need a cluster
df = dd.read_parquet(PRECOMPUTE, engine='fastparquet')
df

CPU times: user 96.2 ms, sys: 38.5 ms, total: 135 ms
Wall time: 134 ms


Unnamed: 0_level_0,timestamp,total_power
npartitions=1347,Unnamed: 1_level_1,Unnamed: 2_level_1
,datetime64[ns],float32
,...,...
...,...,...
,...,...
,...,...


## Subsequent computation utilizing the persisted dataframe

With the dataframe persisted in the ondemand cluster, you can now enjoy the optimized partition size, sorted index persisted in the ephemeral dask cluster.

In [20]:
%%time
# Look at the amount of records you're dealing with
# This example uses 10 million recordsF
cluster.scale(jobs=4)
value = df['total_power'].count().compute()
value

CPU times: user 2.94 s, sys: 178 ms, total: 3.12 s
Wall time: 20.2 s


10849103948

In [8]:
%%time
# Calculation utilizing the persisted dataset should be quicker
cluster.scale(jobs=4)
value = df['total_power'].std().compute()
value

CPU times: user 2.55 s, sys: 125 ms, total: 2.67 s
Wall time: 6.26 s


267.99948

In [9]:
%%time
cluster.scale(jobs=4)
value = df['total_power'].mean().compute()
value

CPU times: user 3.2 s, sys: 132 ms, total: 3.34 s
Wall time: 9.06 s


646.2571924278146

In [10]:
# Below is the way how you debug the cluster
cluster.get_logs()

# Cleaning up

Cleaning up the cluster
Will be automatically curled up when the kernel dies but a good idea to explicitly do this

In [26]:
cluster.scale(jobs=0)

In [27]:
client.close()
cluster.close()