In [1]:
# ! pip install flowcept[dask]

In [2]:
! python reset_dask_nb_exec_counts.py   
! rm -f output.log
# This notebook is causing a silly bug after starting dask cluster. 
# This command just resets the nb's execution counts to fix it. 
# Just save the notebook (cmd+s) after you run dask cluster setup.

In [3]:
# Sleeps are used because these notebooks are being tested automatically as part of the CI/CD. 
# In a normal user interaction, these sleeps would not be necessary.
from time import sleep

In [4]:
def dummy_func1(x):
    return x * 2


def dummy_func2(y):
    return y + y


def calculate_batch_and_epochs(z, w):
    return {
        "batch_size": int(z + w + 16),
        "epochs": max(int(z/w)+1, 2)
    }

### Set the env var pointing to the conf file where the ports, hostnames, and other conf variables are read from.

There is an exemplary conf file available in the `resources` directory in FlowCept repository. You can use it as is if running this Notebook on your local laptop.

In [5]:
def setup_local_dask_cluster():
    from dask.distributed import Client, LocalCluster
    from flowcept import FlowceptDaskWorkerAdapter
    
    cluster = LocalCluster(n_workers=2)
    scheduler = cluster.scheduler
    client = Client(scheduler.address)

    # Register Worker Adapter
    client.register_plugin(FlowceptDaskWorkerAdapter())
    
    return client, cluster

## Start Local Dask Cluster

In [6]:
dask_client, dask_cluster = setup_local_dask_cluster()
dask_client

0,1
Connection method: Direct,
Dashboard: http://127.0.0.1:8787/status,

0,1
Comm: tcp://127.0.0.1:59452,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 10
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:59460,Total threads: 5
Dashboard: http://127.0.0.1:59461/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:59455,
Local directory: /var/folders/jx/23j21rtx1czb2tpqht16pz907m8f48/T/dask-scratch-space/worker-huaqc_r1,Local directory: /var/folders/jx/23j21rtx1czb2tpqht16pz907m8f48/T/dask-scratch-space/worker-huaqc_r1
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 65.41 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://127.0.0.1:59459,Total threads: 5
Dashboard: http://127.0.0.1:59462/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:59456,
Local directory: /var/folders/jx/23j21rtx1czb2tpqht16pz907m8f48/T/dask-scratch-space/worker-7lgyrkf0,Local directory: /var/folders/jx/23j21rtx1czb2tpqht16pz907m8f48/T/dask-scratch-space/worker-7lgyrkf0
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 65.03 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B


## Start Flowcept's Consumer

In [7]:
from flowcept import Flowcept
from flowcept.flowceptor.adapters.dask.dask_plugins import register_dask_workflow

flowcept = Flowcept('dask')
flowcept.start()

<flowcept.flowcept_api.flowcept_controller.Flowcept at 0x13e30bcd0>

In [8]:
submit_based_wf_id = register_dask_workflow(dask_client)
print(f"Workflow_Id={submit_based_wf_id}")

Workflow_Id=9eafccb1-bfb7-4545-9d03-b50ef58e59b9


In [9]:
submit_based_wf_id

'9eafccb1-bfb7-4545-9d03-b50ef58e59b9'

## Client.Submit-based Workflow

In [10]:
import numpy as np
i1 = np.random.random()
i1 = np.random.random()
o1 = dask_client.submit(dummy_func1, i1)
o2 = dask_client.submit(dummy_func2, o1)
o3 = dask_client.submit(calculate_batch_and_epochs, o1, o2)
print(f"Task3_id={o3.key}")
print(f"Result={o3.result()}")

Task3_id=calculate_batch_and_epochs-9aa319d6169f3543f4475cda79d8e674
Result={'batch_size': 19, 'epochs': 2}


## Map-based Workflow

In [11]:
def incr(n):
    return n+1

map_based_wf_id = register_dask_workflow(dask_client)
futures = dask_client.map(incr, range(1000))
results = dask_client.gather(futures)
print(len(results))

1000


In [12]:
## Stopping Flowcept and Dask cluster

In [13]:
flowcept.stop()
dask_client.close()
dask_cluster.close()

## Query the database

In [14]:
_filter = {"workflow_id": submit_based_wf_id}
tasks = Flowcept.db.query(_filter)
tasks

[{'task_id': 'dummy_func1-5e84c8b4f22d040d06e27e974b3ca518',
  'telemetry_at_start': {'cpu': {'times_avg': {'user': 101264.8,
     'nice': 0.0,
     'system': 63534.75,
     'idle': 734514.32},
    'percent_all': 42.8,
    'frequency': 3228,
    'times_per_cpu': [{'user': 29830.3,
      'nice': 0.0,
      'system': 21851.35,
      'idle': 37961.42},
     {'user': 29605.94, 'nice': 0.0, 'system': 21243.65, 'idle': 38810.63},
     {'user': 13673.04, 'nice': 0.0, 'system': 7297.7, 'idle': 68800.31},
     {'user': 9684.72, 'nice': 0.0, 'system': 4599.93, 'idle': 75617.85},
     {'user': 5867.09, 'nice': 0.0, 'system': 2919.26, 'idle': 81194.67},
     {'user': 4189.16, 'nice': 0.0, 'system': 2154.86, 'idle': 83674.14},
     {'user': 3361.5, 'nice': 0.0, 'system': 1389.04, 'idle': 85311.51},
     {'user': 2173.14, 'nice': 0.0, 'system': 864.88, 'idle': 87043.72},
     {'user': 1560.71, 'nice': 0.0, 'system': 654.31, 'idle': 87879.55},
     {'user': 1319.2, 'nice': 0.0, 'system': 559.77, 'idl

In [15]:
sleep(10)

In [16]:
_filter = {"workflow_id": map_based_wf_id}
tasks = Flowcept.db.query(_filter)
assert len(tasks) == len(results)

In [17]:
len(tasks)

1000

In [18]:
exit()