# Dask Adaptive Scaling

In [1]:
from dask_jobqueue import PBSCluster

import dask

dask.config.set(
    {
        'distributed.dashboard.link': 'https://jupyterhub.hpc.ucar.edu/stable/user/{USER}/proxy/{port}/status'
    }
)

<dask.config.set at 0x2add1f61a2e0>

In [2]:
cluster = PBSCluster(
    cores=2,
    processes=1,
    queue="casper",
    walltime="00:30:00",
    resource_spec="select=1:ncpus=1:mem=10GB",
)
cluster

Perhaps you already have a cluster running?
Hosting the HTTP server on port 34929 instead


VBox(children=(HTML(value='<h2>PBSCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .d…

In [3]:
!qstat -u abanihi -l

240546.casper-* abanihi  jhublog* STDIN      217212   1   1    4gb 720:0 R 214:2
251037.casper-* abanihi  jhublog* STDIN      238092   1   1    4gb 720:0 R 137:5
279371.casper-* abanihi  htc      dask-work* 194610   1   1   10gb 00:30 R 00:00
279372.casper-* abanihi  htc      dask-work* 101838   1   1   10gb 00:30 R 00:00
279373.casper-* abanihi  htc      dask-work* 134639   1   1   10gb 00:30 R 00:00
279374.casper-* abanihi  htc      dask-work* 145193   1   1   10gb 00:30 R 00:00
279375.casper-* abanihi  htc      dask-work* 145273   1   1   10gb 00:30 R 00:00


## Batch Job Script

In [4]:
print(cluster.job_script())

#!/usr/bin/env bash

#PBS -N dask-worker
#PBS -q casper
#PBS -A NIOW0001
#PBS -l select=1:ncpus=1:mem=10GB
#PBS -l walltime=00:30:00
#PBS -e /glade/scratch/abanihi/
#PBS -o /glade/scratch/abanihi/

/glade/work/abanihi/opt/miniconda/envs/playground/bin/python -m distributed.cli.dask_worker tcp://10.12.206.47:40433 --nthreads 2 --memory-limit 101.51GiB --name dummy-name --nanny --death-timeout 60 --local-directory /glade/scratch/abanihi --interface ib0 --protocol tcp://



## Cluster Scaling APIs

In [5]:
cluster.scale(5)

In [None]:
cluster.scale(jobs=1)

In [6]:
cluster.adapt(minimum_jobs=1, maximum_jobs=20)

<distributed.deploy.adaptive.Adaptive at 0x2add42e0acd0>

## Connect cluster to the client

In [7]:
from distributed import Client

client = Client(cluster)

## Run some computation

In [8]:
import dask.array as da

x = da.random.random((200, 10_000, 5_000), chunks=(20, 1_000, 1_000))
x

Unnamed: 0,Array,Chunk
Bytes,74.51 GiB,152.59 MiB
Shape,"(200, 10000, 5000)","(20, 1000, 1000)"
Count,500 Tasks,500 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 74.51 GiB 152.59 MiB Shape (200, 10000, 5000) (20, 1000, 1000) Count 500 Tasks 500 Chunks Type float64 numpy.ndarray",5000  10000  200,

Unnamed: 0,Array,Chunk
Bytes,74.51 GiB,152.59 MiB
Shape,"(200, 10000, 5000)","(20, 1000, 1000)"
Count,500 Tasks,500 Chunks
Type,float64,numpy.ndarray


In [9]:
y = x.std(axis=0)
y = y.persist()
y

Unnamed: 0,Array,Chunk
Bytes,381.47 MiB,7.63 MiB
Shape,"(10000, 5000)","(1000, 1000)"
Count,50 Tasks,50 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 381.47 MiB 7.63 MiB Shape (10000, 5000) (1000, 1000) Count 50 Tasks 50 Chunks Type float64 numpy.ndarray",5000  10000,

Unnamed: 0,Array,Chunk
Bytes,381.47 MiB,7.63 MiB
Shape,"(10000, 5000)","(1000, 1000)"
Count,50 Tasks,50 Chunks
Type,float64,numpy.ndarray
