# Parallel Execution of a Long-Running Function with Dask

This notebook demonstrates how to define a long-running function and run it multiple times in parallel using Dask.

In [1]:
from dask.distributed import Client, LocalCluster
import time

# Start a local Dask cluster

cluster = LocalCluster(n_workers=6, threads_per_worker=1, memory_limit='4GB')
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 6
Total threads: 6,Total memory: 22.35 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:53950,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:53995,Total threads: 1
Dashboard: http://127.0.0.1:54000/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:53959,
Local directory: C:\Users\cab\AppData\Local\Temp\dask-scratch-space\worker-s1mw5a13,Local directory: C:\Users\cab\AppData\Local\Temp\dask-scratch-space\worker-s1mw5a13

0,1
Comm: tcp://127.0.0.1:53993,Total threads: 1
Dashboard: http://127.0.0.1:53997/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:53961,
Local directory: C:\Users\cab\AppData\Local\Temp\dask-scratch-space\worker-p40q_4z8,Local directory: C:\Users\cab\AppData\Local\Temp\dask-scratch-space\worker-p40q_4z8

0,1
Comm: tcp://127.0.0.1:54008,Total threads: 1
Dashboard: http://127.0.0.1:54009/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:53963,
Local directory: C:\Users\cab\AppData\Local\Temp\dask-scratch-space\worker-4doi_dw7,Local directory: C:\Users\cab\AppData\Local\Temp\dask-scratch-space\worker-4doi_dw7

0,1
Comm: tcp://127.0.0.1:54002,Total threads: 1
Dashboard: http://127.0.0.1:54004/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:53965,
Local directory: C:\Users\cab\AppData\Local\Temp\dask-scratch-space\worker-b6_wlo4e,Local directory: C:\Users\cab\AppData\Local\Temp\dask-scratch-space\worker-b6_wlo4e

0,1
Comm: tcp://127.0.0.1:53994,Total threads: 1
Dashboard: http://127.0.0.1:53996/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:53967,
Local directory: C:\Users\cab\AppData\Local\Temp\dask-scratch-space\worker-x71ws1am,Local directory: C:\Users\cab\AppData\Local\Temp\dask-scratch-space\worker-x71ws1am

0,1
Comm: tcp://127.0.0.1:54003,Total threads: 1
Dashboard: http://127.0.0.1:54006/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:53969,
Local directory: C:\Users\cab\AppData\Local\Temp\dask-scratch-space\worker-msg0aadq,Local directory: C:\Users\cab\AppData\Local\Temp\dask-scratch-space\worker-msg0aadq


2025-06-12 12:37:18,437 - distributed.dashboard.components.shared - ERROR - list index out of range
Traceback (most recent call last):
  File "c:\sourcerepos\ray-dask-testing\.venv\Lib\site-packages\distributed\utils.py", line 824, in wrapper
    return func(*args, **kwargs)
  File "c:\sourcerepos\ray-dask-testing\.venv\Lib\site-packages\distributed\dashboard\components\shared.py", line 206, in cb
    data = profile.plot_data(self.states[new[0]], profile_interval)
                             ~~~~~~~~~~~^^^^^^^^
IndexError: list index out of range
2025-06-12 12:37:18,459 - bokeh.server.protocol_handler - ERROR - error handling message
 message: Message 'PATCH-DOC' content: {'events': [{'kind': 'ModelChanged', 'model': {'id': 'p1951'}, 'attr': 'indices', 'new': [8]}]} 
 error: IndexError('list index out of range')
Traceback (most recent call last):
  File "c:\sourcerepos\ray-dask-testing\.venv\Lib\site-packages\bokeh\server\protocol_handler.py", line 94, in handle
    work = await handl

In [2]:
# Define a long-running function
def long_running_task(x):
    print(f"Starting task {x}")
    time.sleep(30)  # Simulate a long computation
    print(f"Finished task {x}")
    return x * x

In [8]:
from dask import delayed

# Wrap the function calls with dask.delayed
tasks = [delayed(long_running_task)(i) for i in range(100)]

In [13]:
100*30/60/14

3.5714285714285716

In [4]:
# Compute the results in parallel
results = client.compute(tasks, sync=True)
results

[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361,
 400,
 441,
 484,
 529,
 576,
 625,
 676,
 729,
 784,
 841,
 900,
 961,
 1024,
 1089,
 1156,
 1225,
 1296,
 1369,
 1444,
 1521,
 1600,
 1681,
 1764,
 1849,
 1936,
 2025,
 2116,
 2209,
 2304,
 2401,
 2500,
 2601,
 2704,
 2809,
 2916,
 3025,
 3136,
 3249,
 3364,
 3481,
 3600,
 3721,
 3844,
 3969,
 4096,
 4225,
 4356,
 4489,
 4624,
 4761,
 4900,
 5041,
 5184,
 5329,
 5476,
 5625,
 5776,
 5929,
 6084,
 6241,
 6400,
 6561,
 6724,
 6889,
 7056,
 7225,
 7396,
 7569,
 7744,
 7921,
 8100,
 8281,
 8464,
 8649,
 8836,
 9025,
 9216,
 9409,
 9604,
 9801]

In [9]:
# Compute the results in parallel
results = client.compute(tasks, sync=False)
results

[<Future: pending, key: long_running_task-fbed9550-cf85-4cbd-b02b-f44e8611eebe>,
 <Future: pending, key: long_running_task-953eb39b-8e28-4016-bc00-e15b3c9cf4ea>,
 <Future: pending, key: long_running_task-98980f85-baa9-4fa5-b38d-9abe8dccc244>,
 <Future: pending, key: long_running_task-fe6d59b1-5511-4819-8b4c-77a086ffbe0f>,
 <Future: pending, key: long_running_task-311a728e-2a9c-40cb-829a-e455e1cd62b5>,
 <Future: pending, key: long_running_task-077f9bbe-d1cd-4b8c-a7b8-ef70d9381a07>,
 <Future: pending, key: long_running_task-8e3cc515-a6b7-404c-9a34-0c792db4e477>,
 <Future: pending, key: long_running_task-192c5c8d-a5ea-4c43-bc0e-fa3d88134c23>,
 <Future: pending, key: long_running_task-7b8edaf2-3d5b-45f5-b4ec-3c5f21b0b3c6>,
 <Future: pending, key: long_running_task-38a61206-60ff-4725-acaf-bd2adb77a7bf>,
 <Future: pending, key: long_running_task-57904034-0e1b-417b-b815-d8720592ac60>,
 <Future: pending, key: long_running_task-8d59aaa7-f697-4576-a34c-78f99974788f>,
 <Future: pending, key: long

In [9]:
# Optionally, close the client when done
client.close()