In [1]:
from dask.distributed import Client, Scheduler, Nanny, SpecCluster
from dask_cuda.worker_spec import worker_spec
import dask
import psutil
import multiprocessing

In [2]:
# gather device info
cpu_count = multiprocessing.cpu_count()
memory_count = psutil.virtual_memory().total
print("CPU count:", cpu_count)
print("System memory:",memory_count)

CPU count: 16
System memory: 134746021888


In [3]:
specs = {
    "cpu":{
        "scale":3,
        "resources":{
        }
    },
    "gpu":{
        "scale":1,
        "resources":{
            "CUDA_VISIBLE_DEVICES": [0],
        }
    }
}

worker_count = 0
for v in specs.values():
    worker_count += v["scale"]

nthreads = cpu_count//worker_count
memory_limit = int(memory_count*0.9)//worker_count # set to use 90% of the system memory to avoid crashing

print("number of workers:", worker_count)
print("threads per worker:", nthreads)
print("memory limit per worker:", round(memory_limit/(1024*1024*1024),2), "GB")

number of workers: 4
threads per worker: 4
memory limit per worker: 28.24 GB


In [4]:
worker_spec(threads_per_worker=nthreads, memory_limit=memory_limit, CUDA_VISIBLE_DEVICES=[0])

{0: {'cls': distributed.nanny.Nanny,
  'options': {'env': {'CUDA_VISIBLE_DEVICES': '0'},
   'interface': None,
   'protocol': None,
   'nthreads': 4,
   'data': dict,
   'dashboard_address': ':8787',
   'plugins': [<dask_cuda.utils.CPUAffinity at 0x7efcd9eefd00>],
   'silence_logs': True,
   'memory_limit': 134746021888.0,
   'preload': 'dask_cuda.initialize',
   'preload_argv': '--create-cuda-context'}}}

In [5]:
workers = {}

for k, v in specs.items():
    for i in range(v["scale"]):
        if "CUDA_VISIBLE_DEVICES" in v["resources"].keys():
            workers["{}-{}".format(k,i)] = worker_spec(
                threads_per_worker=nthreads, 
                CUDA_VISIBLE_DEVICES=v["resources"]["CUDA_VISIBLE_DEVICES"]
                )[0]
            workers["{}-{}".format(k,i)]["options"]["resources"]={"GPU":len(v["resources"]["CUDA_VISIBLE_DEVICES"])}
            workers["{}-{}".format(k,i)]["options"]["memory_limit"]=memory_limit
        else:
            workers["{}-{}".format(k,i)] = {
                "cls":Nanny,
                "options":{
                    "nthreads": nthreads,
                    "memory_limit": memory_limit
                    }
             }     
            
workers

{'cpu-0': {'cls': distributed.nanny.Nanny,
  'options': {'nthreads': 4, 'memory_limit': 30317854924}},
 'cpu-1': {'cls': distributed.nanny.Nanny,
  'options': {'nthreads': 4, 'memory_limit': 30317854924}},
 'cpu-2': {'cls': distributed.nanny.Nanny,
  'options': {'nthreads': 4, 'memory_limit': 30317854924}},
 'gpu-0': {'cls': distributed.nanny.Nanny,
  'options': {'env': {'CUDA_VISIBLE_DEVICES': '0'},
   'interface': None,
   'protocol': None,
   'nthreads': 4,
   'data': dict,
   'dashboard_address': ':8787',
   'plugins': [<dask_cuda.utils.CPUAffinity at 0x7efbb0b83430>],
   'silence_logs': True,
   'memory_limit': 30317854924,
   'preload': 'dask_cuda.initialize',
   'preload_argv': '--create-cuda-context',
   'resources': {'GPU': 1}}}}

In [6]:
scheduler = {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}}
cluster = SpecCluster(scheduler=scheduler, workers=workers)
client = Client(cluster)
client

2023-04-14 16:45:40,874 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
Perhaps you already have a cluster running?
Hosting the HTTP server on port 36279 instead
2023-04-14 16:45:40,899 - distributed.scheduler - INFO - State start
2023-04-14 16:45:40,904 - distributed.scheduler - INFO -   Scheduler at: tcp://129.67.90.167:33471
2023-04-14 16:45:40,904 - distributed.scheduler - INFO -   dashboard at:                    :36279
2023-04-14 16:45:40,940 - distributed.nanny - INFO -         Start Nanny at: 'tcp://129.67.90.167:41083'
2023-04-14 16:45:40,944 - distributed.nanny - INFO -         Start Nanny at: 'tcp://129.67.90.167:35539'
2023-04-14 16:45:40,948 - distributed.nanny - INFO -         Start Nanny at: 'tcp://129.67.90.167:44937'
2023-04-14 16:45:40,953 - distributed.nanny - INFO -         Start Nanny at: 'tcp://129.67.90.167:46533'
2023-04-14 16:45:41,232 - distributed.disk

0,1
Connection method: Cluster object,Cluster type: distributed.SpecCluster
Dashboard: http://129.67.90.167:36279/status,

0,1
Dashboard: http://129.67.90.167:36279/status,Workers: 4
Total threads: 16,Total memory: 112.94 GiB

0,1
Comm: tcp://129.67.90.167:33471,Workers: 4
Dashboard: http://129.67.90.167:36279/status,Total threads: 16
Started: Just now,Total memory: 112.94 GiB

0,1
Comm: tcp://129.67.90.167:44155,Total threads: 4
Dashboard: http://129.67.90.167:36805/status,Memory: 28.24 GiB
Nanny: tcp://129.67.90.167:35539,
Local directory: /tmp/dask-worker-space/worker-ga9mfvx4,Local directory: /tmp/dask-worker-space/worker-ga9mfvx4
GPU: Quadro RTX 4000,GPU memory: 8.00 GiB

0,1
Comm: tcp://129.67.90.167:34405,Total threads: 4
Dashboard: http://129.67.90.167:38589/status,Memory: 28.24 GiB
Nanny: tcp://129.67.90.167:46533,
Local directory: /tmp/dask-worker-space/worker-c5t470dt,Local directory: /tmp/dask-worker-space/worker-c5t470dt
GPU: Quadro RTX 4000,GPU memory: 8.00 GiB

0,1
Comm: tcp://129.67.90.167:35529,Total threads: 4
Dashboard: http://129.67.90.167:45219/status,Memory: 28.24 GiB
Nanny: tcp://129.67.90.167:44937,
Local directory: /tmp/dask-worker-space/worker-2r7n29wp,Local directory: /tmp/dask-worker-space/worker-2r7n29wp
GPU: Quadro RTX 4000,GPU memory: 8.00 GiB

0,1
Comm: tcp://129.67.90.167:41215,Total threads: 4
Dashboard: http://129.67.90.167:36451/status,Memory: 28.24 GiB
Nanny: tcp://129.67.90.167:41083,
Local directory: /tmp/dask-worker-space/worker-_onhnia6,Local directory: /tmp/dask-worker-space/worker-_onhnia6
GPU: Quadro RTX 4000,GPU memory: 8.00 GiB


In [7]:
client.cluster.workers

{'gpu-0': <Nanny: tcp://129.67.90.167:41215, threads: 4>,
 'cpu-0': <Nanny: tcp://129.67.90.167:44155, threads: 4>,
 'cpu-2': <Nanny: tcp://129.67.90.167:35529, threads: 4>,
 'cpu-1': <Nanny: tcp://129.67.90.167:34405, threads: 4>}

In [8]:
def inc(x):
    return x + 1

In [9]:
x= dask.delayed(inc)(1)

In [10]:
# specify the worker for the compute process
with dask.annotate(resources={'GPU': 1}):
    res = dask.compute(x)