In [5]:
import logging
import time

import numpy as np
from dask.distributed import Client, worker_client
import dask

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("Dask Demo")

In [6]:
with dask.config.set({"distributed.worker.resources.GPU": 2}):
    client = Client()

# client = Client()
print(f"Connected to Dask at: {client.dashboard_link}")

Connected to Dask at: http://127.0.0.1:8787/status


2025-11-29 14:30:36,004 - tornado.application - ERROR - Uncaught exception GET /status/ws (127.0.0.1)
HTTPServerRequest(protocol='http', host='127.0.0.1:8787', method='GET', uri='/status/ws', version='HTTP/1.1', remote_ip='127.0.0.1')
Traceback (most recent call last):
  File "/Users/jakub/workspace/personal/python-concurrency-fnspe-2025/.venv/lib/python3.13/site-packages/tornado/websocket.py", line 965, in _accept_connection
    open_result = handler.open(*handler.open_args, **handler.open_kwargs)
  File "/Users/jakub/workspace/personal/python-concurrency-fnspe-2025/.venv/lib/python3.13/site-packages/tornado/web.py", line 3375, in wrapper
    return method(self, *args, **kwargs)
  File "/Users/jakub/workspace/personal/python-concurrency-fnspe-2025/.venv/lib/python3.13/site-packages/bokeh/server/views/ws.py", line 149, in open
    raise ProtocolError("Token is expired. Configure the app with a larger value for --session-token-expiration if necessary")
bokeh.protocol.exceptions.Protocol

In [57]:
slowness = 0.3
failure_rate = 0.2
retries = 5

In [58]:
def load_data(size: int, latency_s: float = slowness) -> np.ndarray:
    # Pretend we're fetching from remote storage
    time.sleep(latency_s * 5)
    return np.arange(size, dtype=np.int64)


def check_divisibility_single(data: np.ndarray, by: int) -> np.ndarray:
    if np.random.random() < failure_rate:
        logger.warning("Random failure occurred!")
        raise ValueError("Random failure occurred!")
    # Random sleep to simulate work
    time.sleep(by * slowness * np.random.random())
    return (data % by) == 0


def check_divisibility(data: np.ndarray, divisors: np.ndarray) -> np.ndarray:
    with worker_client() as wc:  # Get Dask worker client
        part_futs = [  # Submit tasks to Dask cluster
            wc.submit(
                check_divisibility_single,
                data,
                by,
                resources={"GPU": 1},  # Tag task with GPU resource
                # Retry task if it fails
                retries=retries,
            )
            for by in divisors
        ]
        parts = wc.gather(part_futs)  # Gather results from Dask cluster
    return np.logical_and.reduce(parts)


def is_divisible_by(
    data: np.ndarray, divisors: np.ndarray, num_chunks: int = 4
) -> np.int64:
    """Check if data is divisible by divisors.

    Args:
        data: Data to check divisibility of.
        divisors: Divisors to check divisibility by.
        num_chunks: Number of chunks to split data into.

    Returns:
        Data that is divisible by divisors.
    """
    # Get Dask worker client
    with worker_client() as wc:
        # Split data into chunks
        chunks = np.array_split(data, num_chunks)
        # Submit tasks to Dask cluster (non-blocking)
        part_futs = [wc.submit(check_divisibility, c, divisors) for c in chunks]
        # Gather results from Dask cluster (blocking)
        parts = wc.gather(part_futs)
    return data[np.concatenate(parts, axis=0)]

In [59]:
size = 10_000
num_chunks = 10
divisors = np.array([3, 5, 7])

In [60]:
# 1) Load data remotely -> Future
data_future = client.submit(load_data, size)

In [61]:
# 2) Pass the future + extra arg into a function that
# splits work via worker_client
result_future = client.submit(
    is_divisible_by, data_future, divisors=divisors, num_chunks=num_chunks
)
result_future

Random failure occurred!
2025-11-29 14:29:52,477 - distributed.worker - ERROR - Compute Failed
Key:       check_divisibility_single-13f3c3cd33b2377995713461f4ceaf67
State:     executing
Task:  <Task 'check_divisibility_single-13f3c3cd33b2377995713461f4ceaf67' check_divisibility_single(...)>
Exception: "ValueError('Random failure occurred!')"
Traceback: '  File "/var/folders/3b/2v5lybv50v5_wq2kw2886gr40000gn/T/ipykernel_17875/3651060410.py", line 10, in check_divisibility_single\n'

Random failure occurred!
2025-11-29 14:29:52,483 - distributed.worker - ERROR - Compute Failed
Key:       check_divisibility_single-bcac07015d86d2df8a54ce80ce89da02
State:     executing
Task:  <Task 'check_divisibility_single-bcac07015d86d2df8a54ce80ce89da02' check_divisibility_single(...)>
Exception: "ValueError('Random failure occurred!')"
Traceback: '  File "/var/folders/3b/2v5lybv50v5_wq2kw2886gr40000gn/T/ipykernel_17875/3651060410.py", line 10, in check_divisibility_single\n'

Random failure occurred!
R

In [62]:
# 3) Wait for the result (blocking)
result: np.ndarray = result_future.result()
result

array([   0,  105,  210,  315,  420,  525,  630,  735,  840,  945, 1050,
       1155, 1260, 1365, 1470, 1575, 1680, 1785, 1890, 1995, 2100, 2205,
       2310, 2415, 2520, 2625, 2730, 2835, 2940, 3045, 3150, 3255, 3360,
       3465, 3570, 3675, 3780, 3885, 3990, 4095, 4200, 4305, 4410, 4515,
       4620, 4725, 4830, 4935, 5040, 5145, 5250, 5355, 5460, 5565, 5670,
       5775, 5880, 5985, 6090, 6195, 6300, 6405, 6510, 6615, 6720, 6825,
       6930, 7035, 7140, 7245, 7350, 7455, 7560, 7665, 7770, 7875, 7980,
       8085, 8190, 8295, 8400, 8505, 8610, 8715, 8820, 8925, 9030, 9135,
       9240, 9345, 9450, 9555, 9660, 9765, 9870, 9975])