In [1]:
import os
import time
import cdsw
import dask

dask_scheduler = cdsw.launch_workers(
    n=1,
    cpu=2,
    memory=2,
    code=f"!dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:8090",
)

# Wait for the scheduler to start.
time.sleep(10)

In [2]:
print("//".join(dask_scheduler[0]["app_url"].split("//")))

http://365pzs0e7skkzbqh.ml-76cf996d-8f4.apps.apps.ocp4.cdpkvm.cldr/


In [3]:
scheduler_workers = cdsw.list_workers()
scheduler_id = dask_scheduler[0]["id"]
scheduler_ip = [
    worker["ip_address"] for worker in scheduler_workers if worker["id"] == scheduler_id
][0]

scheduler_url = f"tcp://{scheduler_ip}:8786"

scheduler_url

'tcp://10.254.0.215:8786'

In [4]:
from dask.distributed import Client
client = Client(scheduler_url)
client

0,1
Connection method: Direct,
Dashboard: http://10.254.0.215:8090/status,

0,1
Comm: tcp://10.254.0.215:8786,Workers: 1
Dashboard: http://10.254.0.215:8090/status,Total threads: 16
Started: 4 minutes ago,Total memory: 14.81 GiB

0,1
Comm: tcp://10.254.3.233:35955,Total threads: 16
Dashboard: http://10.254.3.233:39221/status,Memory: 14.81 GiB
Nanny: tcp://10.254.3.233:37997,
Local directory: /home/cdsw/dask-worker-space/worker-435lk0kz,Local directory: /home/cdsw/dask-worker-space/worker-435lk0kz
GPU: NVIDIA A100-PCIE-40GB,GPU memory: 39.59 GiB
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 8.0%,Last seen: Just now
Memory usage: 130.96 MiB,Spilled bytes: 0 B
Read bytes: 9.79 kiB,Write bytes: 10.60 kiB


In [5]:
import dask.array as da
rs = da.random.RandomState()
x = rs.normal(10, 1, size=(10000, 10000), chunks=(1000, 1000))
x

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 762.94 MiB 7.63 MiB Shape (10000, 10000) (1000, 1000) Count 100 Tasks 100 Chunks Type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray


In [6]:
start = time.time()
total = (x + 1)[::2, ::2].sum().compute()
end = time.time()
print("Result:", total)
print("Time Taken:{}".format(end - start))

Result: 275011123.9345571
Time Taken:20.333498001098633


In [7]:
start = time.time()
total = (x + 1)[::2, ::2].sum().compute(num_workers=9)
end = time.time()
print("Result:", total)
print("Time Taken:{}".format(end - start))

Result: 275011123.9345571
Time Taken:17.990230798721313


In [8]:
start = time.time()
total = (x + 1)[::2, ::2].sum().compute(num_workers=1)
end = time.time()
print("Result:", total)
print("Time Taken:{}".format(end - start))

Result: 275011123.9345571
Time Taken:26.49886441230774


In [9]:
start = time.time()
total = (x + 1)[::2, ::2].sum().compute(scheduler='single-threaded')
end = time.time()
print("Result:", total)
print("Time Taken:{}".format(end - start))

Result: 275011123.9345571
Time Taken:4.032877683639526


In [10]:
start = time.time()
total = (x + 1)[::2, ::2].sum().compute(scheduler='threads')
end = time.time()
print("Result:", total)
print("Time Taken:{}".format(end - start))

Result: 275011123.9345571
Time Taken:0.6311771869659424


In [11]:
import cupy
rs = da.random.RandomState(RandomState=cupy.random.RandomState)
x = rs.normal(10, 1, size=(10000, 10000), chunks=(1000, 1000))
x

start = time.time()
total = (x + 1)[::2, ::2].sum().compute(scheduler='single-threaded')
end = time.time()
print("Result:", total)
print("Time Taken:{}".format(end - start))

Result: 274998139.072111
Time Taken:1.2028133869171143


In [12]:
more_worker = 2
dask_workers = cdsw.launch_workers(
    n=more_worker,
    cpu=2,
    memory=8,
    code=f"!dask-worker {scheduler_url}",
)

# Wait for the workers to start.
time.sleep(10)

In [13]:
client

0,1
Connection method: Direct,
Dashboard: http://10.254.0.215:8090/status,

0,1
Comm: tcp://10.254.0.215:8786,Workers: 3
Dashboard: http://10.254.0.215:8090/status,Total threads: 64
Started: 10 minutes ago,Total memory: 29.52 GiB

0,1
Comm: tcp://10.254.0.218:37905,Total threads: 24
Dashboard: http://10.254.0.218:34401/status,Memory: 7.36 GiB
Nanny: tcp://10.254.0.218:38803,
Local directory: /home/cdsw/dask-worker-space/worker-pom7foht,Local directory: /home/cdsw/dask-worker-space/worker-pom7foht
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 4.0%,Last seen: Just now
Memory usage: 127.66 MiB,Spilled bytes: 0 B
Read bytes: 2.15 kiB,Write bytes: 2.00 kiB

0,1
Comm: tcp://10.254.2.5:38941,Total threads: 24
Dashboard: http://10.254.2.5:43885/status,Memory: 7.36 GiB
Nanny: tcp://10.254.2.5:41833,
Local directory: /home/cdsw/dask-worker-space/worker-grolipjk,Local directory: /home/cdsw/dask-worker-space/worker-grolipjk
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 4.0%,Last seen: Just now
Memory usage: 127.61 MiB,Spilled bytes: 0 B
Read bytes: 417.5593743822537 B,Write bytes: 0.94 kiB

0,1
Comm: tcp://10.254.3.233:35955,Total threads: 16
Dashboard: http://10.254.3.233:39221/status,Memory: 14.81 GiB
Nanny: tcp://10.254.3.233:37997,
Local directory: /home/cdsw/dask-worker-space/worker-435lk0kz,Local directory: /home/cdsw/dask-worker-space/worker-435lk0kz
GPU: NVIDIA A100-PCIE-40GB,GPU memory: 39.59 GiB
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 8.0%,Last seen: Just now
Memory usage: 436.68 MiB,Spilled bytes: 0 B
Read bytes: 16.45 kiB,Write bytes: 15.04 kiB


In [14]:
import dask.array as da
rs = da.random.RandomState()
x = rs.normal(10, 1, size=(10000, 10000), chunks=(1000, 1000))
x

start = time.time()
total = (x + 1)[::2, ::2].sum().compute()
end = time.time()
print("Result:", total)
print("Time Taken:{}".format(end - start))

Result: 274998653.37743837
Time Taken:30.304303646087646


In [15]:
start = time.time()
total = (x + 1)[::2, ::2].sum().compute(num_workers=9)
end = time.time()
print("Result:", total)
print("Time Taken:{}".format(end - start))

Result: 274998653.37743837
Time Taken:25.068480253219604


In [16]:
start = time.time()
total = (x + 1)[::2, ::2].sum().compute(scheduler='single-threaded')
end = time.time()
print("Result:", total)
print("Time Taken:{}".format(end - start))

Result: 274998653.37743837
Time Taken:3.8503987789154053


In [17]:
start = time.time()
total = (x + 1)[::2, ::2].sum().compute(scheduler='threads')
end = time.time()
print("Result:", total)
print("Time Taken:{}".format(end - start))

Result: 274998653.37743837
Time Taken:0.45389604568481445


In [18]:
cdsw.stop_workers()

[<Response [204]>, <Response [204]>, <Response [204]>]

distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
