In [1]:
import dask.distributed as dd
from dask.distributed import Client, LocalCluster, progress
from dask_jobqueue import PBSCluster
from dask.distributed import get_worker
import os

In [2]:
# create the user directory if it doesn't already exist
! mkdir -p /scratch/vp91/$USER

In [3]:
# set the path 
user = os.getenv('USER', 'default value')
path = '/scratch/vp91/'+user
print(path)

/scratch/vp91/jxj900


In [4]:
# The jupyter notebook is launched from your $HOME directory.
# Change the working directory the user directory under /scratch/vp91
os.chdir(os.path.expandvars(path))

In [5]:
# Make sure the python we use is from the venv
os.environ['DASK_PYTHON'] = '/scratch/vp91/Training-Venv/parallel_python/bin/python3'

In [10]:
# Make sure all the modules are loaded.
# It is essential that we use the same python and library for all aspects of dask
# If we dont activate the venv then the workers may have a different versions of libraries
setup_commands = ["module load python3/3.11.0", "module load cmake/3.16.2", "source /scratch/abc/Training-Venv/parallel_python/bin/activate"]

In [11]:
# Gadi use custom PBS directives
# So some of the default values to launch a PBS job through Dask call will not work in Gadi
# Any directive specific to gadi should be mentioned here.
# refer : https://opus.nci.org.au/display/Help/Gadi+Quick+Reference+Guide
extra = ['-q normal',
         '-P vp91', 
         '-l ncpus=48', 
         '-l mem=192GB']

In [12]:
# walltime: Walltime for each worker job.
# cores: Total number of cores per job.
# shebang: Path to desired interpreter for your batch submission script.
# job_extra_directives: List of other PBS options. Each option will be prepended with the #PBS prefix.
# local_directory: Dask worker local directory for file spilling.
# job_directives_skip: Directives to skip in the generated job script header. Directives lines containing 
#                      the specified strings will be removed. Directives added by job_extra_directives 
#                      won’t be affected.
# interface: Network interface like ‘eth0’ or ‘ib0’. This will be used both for the Dask scheduler and 
#            the Dask workers interface
# job_script_prologue: Commands to add to script before launching worker
# python: Python executable used to launch Dask workers. Defaults to the Python that is submitting these jobs



cluster = PBSCluster(walltime="00:20:00", 
                     cores=48, 
                     memory="192GB",
                     shebang='#!/usr/bin/env bash',
                     job_extra_directives=extra, 
                     local_directory='$TMPDIR', 
                     job_directives_skip=["select"], 
                     interface="ib0",
                     job_script_prologue=setup_commands,
                     python=os.environ["DASK_PYTHON"])

Perhaps you already have a cluster running?
Hosting the HTTP server on port 42625 instead
2024-12-05 14:11:37,167 - bokeh.server.protocol_handler - ERROR - error handling message
 message: Message 'PULL-DOC-REQ' content: {} 
 error: SerializationError("can't serialize <class 'function'>")
Traceback (most recent call last):
  File "/scratch/vp91/Training-Venv/parallel_python/lib/python3.11/site-packages/bokeh/server/protocol_handler.py", line 94, in handle
    work = await handler(message, connection)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/vp91/Training-Venv/parallel_python/lib/python3.11/site-packages/bokeh/server/session.py", line 94, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scratch/vp91/Training-Venv/parallel_python/lib/python3.11/site-packages/bokeh/server/session.py", line 257, in _handle_pull
    return connection.protocol.create('PULL-DOC-REPLY', message.header['msgid'], self

In [13]:
print(cluster.job_script())

#!/usr/bin/env bash

#PBS -N dask-worker
#PBS -l walltime=00:20:00
#PBS -q normal
#PBS -P vp91
#PBS -l ncpus=48
#PBS -l mem=192GB
module load python3/3.11.0
module load cmake/3.16.2
source /scratch/abc/Training-Venv/parallel_python/bin/activate
/scratch/vp91/Training-Venv/parallel_python/bin/python3 -m distributed.cli.dask_worker tcp://10.6.42.31:40023 --name dummy-name --nthreads 6 --memory-limit 22.35GiB --nworkers 8 --nanny --death-timeout 60 --local-directory $TMPDIR --interface ib0



In [14]:
# create a cluster with 2 nodes
cluster.scale(jobs=2)

In [19]:
# Verify the workers have been allocated as expected
!qstat

Job id                 Name             User              Time Use S Queue
---------------------  ---------------- ----------------  -------- - -----
130162072.gadi-pbs     sys-dashboard-s* jxj900            00:00:27 R normal-exec     
130163622.gadi-pbs     dask-worker      jxj900            00:00:00 R normal-exec     
130163623.gadi-pbs     dask-worker      jxj900            00:00:00 R normal-exec     


In [None]:
cluster

In [20]:
# create the client
client = Client(cluster)

In [21]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: /proxy/42625/status,

0,1
Dashboard: /proxy/42625/status,Workers: 16
Total threads: 96,Total memory: 357.60 GiB

0,1
Comm: tcp://10.6.42.31:40023,Workers: 16
Dashboard: /proxy/42625/status,Total threads: 96
Started: 2 minutes ago,Total memory: 357.60 GiB

0,1
Comm: tcp://10.6.77.16:33837,Total threads: 6
Dashboard: /proxy/45747/status,Memory: 22.35 GiB
Nanny: tcp://10.6.77.16:44573,
Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-fkldbuwq,Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-fkldbuwq

0,1
Comm: tcp://10.6.77.16:33063,Total threads: 6
Dashboard: /proxy/35431/status,Memory: 22.35 GiB
Nanny: tcp://10.6.77.16:44243,
Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-xwqzlqwq,Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-xwqzlqwq

0,1
Comm: tcp://10.6.77.16:42111,Total threads: 6
Dashboard: /proxy/37689/status,Memory: 22.35 GiB
Nanny: tcp://10.6.77.16:44223,
Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-oo19vkwk,Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-oo19vkwk

0,1
Comm: tcp://10.6.77.16:44353,Total threads: 6
Dashboard: /proxy/45989/status,Memory: 22.35 GiB
Nanny: tcp://10.6.77.16:45475,
Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-7fnbn4lp,Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-7fnbn4lp

0,1
Comm: tcp://10.6.77.16:44545,Total threads: 6
Dashboard: /proxy/34965/status,Memory: 22.35 GiB
Nanny: tcp://10.6.77.16:44089,
Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-ol8olzrq,Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-ol8olzrq

0,1
Comm: tcp://10.6.77.16:40275,Total threads: 6
Dashboard: /proxy/41397/status,Memory: 22.35 GiB
Nanny: tcp://10.6.77.16:33353,
Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-7ur6egk_,Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-7ur6egk_

0,1
Comm: tcp://10.6.77.16:34117,Total threads: 6
Dashboard: /proxy/36941/status,Memory: 22.35 GiB
Nanny: tcp://10.6.77.16:41875,
Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-zakgxe28,Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-zakgxe28

0,1
Comm: tcp://10.6.77.16:45025,Total threads: 6
Dashboard: /proxy/38525/status,Memory: 22.35 GiB
Nanny: tcp://10.6.77.16:45433,
Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-gbloahkg,Local directory: /jobfs/130163623.gadi-pbs/dask-scratch-space/worker-gbloahkg

0,1
Comm: tcp://10.6.78.20:33933,Total threads: 6
Dashboard: /proxy/35537/status,Memory: 22.35 GiB
Nanny: tcp://10.6.78.20:33635,
Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-eh21_v11,Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-eh21_v11

0,1
Comm: tcp://10.6.78.20:36677,Total threads: 6
Dashboard: /proxy/41213/status,Memory: 22.35 GiB
Nanny: tcp://10.6.78.20:44797,
Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-4dc64_az,Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-4dc64_az

0,1
Comm: tcp://10.6.78.20:45033,Total threads: 6
Dashboard: /proxy/43207/status,Memory: 22.35 GiB
Nanny: tcp://10.6.78.20:45701,
Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-0cbqxla9,Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-0cbqxla9

0,1
Comm: tcp://10.6.78.20:38807,Total threads: 6
Dashboard: /proxy/40145/status,Memory: 22.35 GiB
Nanny: tcp://10.6.78.20:39961,
Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-q0itvfwm,Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-q0itvfwm

0,1
Comm: tcp://10.6.78.20:39071,Total threads: 6
Dashboard: /proxy/41279/status,Memory: 22.35 GiB
Nanny: tcp://10.6.78.20:35509,
Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-nepwf93t,Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-nepwf93t

0,1
Comm: tcp://10.6.78.20:33871,Total threads: 6
Dashboard: /proxy/40321/status,Memory: 22.35 GiB
Nanny: tcp://10.6.78.20:33729,
Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-mxk2fzjl,Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-mxk2fzjl

0,1
Comm: tcp://10.6.78.20:40271,Total threads: 6
Dashboard: /proxy/41137/status,Memory: 22.35 GiB
Nanny: tcp://10.6.78.20:34099,
Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-5gjyeds_,Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-5gjyeds_

0,1
Comm: tcp://10.6.78.20:45881,Total threads: 6
Dashboard: /proxy/45229/status,Memory: 22.35 GiB
Nanny: tcp://10.6.78.20:41441,
Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-mxhejflc,Local directory: /jobfs/130163622.gadi-pbs/dask-scratch-space/worker-mxhejflc


cluster.get_logs()

In [22]:
# A simple test function
def slow_increment(x):
    return x+1

In [24]:
# Submit the work to the Dask cluster
futures = client.submit(slow_increment, 5000)

In [25]:
futures

In [26]:
futures.result()

5001

In [29]:
import dask.dataframe as df

ddf = df.read_csv(
    os.path.join("distributed-computing-in-dask/data", "nycflights", "*.csv"),
    dtype={"TailNum": str, "CRSElapsedTime": float, "Cancelled": bool},
)

In [30]:
len(ddf)

2611892